Skip to content

Commit

Permalink
Merge branch 'master' into dashboard-taskrun-json
Browse files Browse the repository at this point in the history
  • Loading branch information
HazimAr authored Nov 13, 2024
2 parents 033bae9 + 32c5f29 commit b1d635c
Show file tree
Hide file tree
Showing 36 changed files with 646 additions and 125 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ lhctl/lhctl
local-dev/certs/
build/
.config
*.pem

# Python
__pycache__
Expand Down
20 changes: 20 additions & 0 deletions docs/docs/06-operations/01-server-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,26 @@ The number of threads to execute stream processing in the Core Topology. [Kafka

---

### `LHS_KAFKA_TRANSACTION_TIMEOUT_MS`

The transaction timeout configured for the Core Topology producer.

- **Type:** int, >= 1
- **Default:** 60000
- **Importance:** medium

---

### `LHS_CORE_KS_CONFIG_`

Any configurations prefixed with this prefix will be appended to the Kafka Streams configuration properties for the Core Topology. For example, setting `LHS_CORE_KS_CONFIG_RESTORE_CONSUMER_CLIENT_RACK` would set the `restore.consumer.client.rack` configuration.

- **Type:** any
- **Default:** none
- **Importance:** low

---

### `LHS_TIMER_STREAM_THREADS`

The number of threads to execute stream processing in the Timer Topology. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_num.stream.threads). For a server with `N` cores, we recommend setting this to `N * 0.4`.
Expand Down
2 changes: 2 additions & 0 deletions local-dev/configs/server-1.config
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
LHS_STREAMS_METRICS_LEVEL=info

LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000

LHS_HEALTH_SERVICE_PORT=1822

LHS_INTERNAL_BIND_PORT=2011
Expand Down
13 changes: 13 additions & 0 deletions sdk-dotnet/Examples/MaskedFieldsExample/MaskedFieldsExample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\LittleHorse.Sdk\LittleHorse.Sdk.csproj" />
</ItemGroup>
</Project>
56 changes: 56 additions & 0 deletions sdk-dotnet/Examples/MaskedFieldsExample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Examples.BasicExample;
using LittleHorse.Sdk;
using LittleHorse.Sdk.Worker;

public class Program
{
private static ServiceProvider? _serviceProvider;
private static void SetupApplication()
{
_serviceProvider = new ServiceCollection()
.AddLogging(config =>
{
config.AddConsole();
config.SetMinimumLevel(LogLevel.Debug);
})
.BuildServiceProvider();
}

private static LHConfig GetLHConfig(string[] args, ILoggerFactory loggerFactory)
{
var config = new LHConfig(loggerFactory);

string filePath = Path.Combine(Directory.GetCurrentDirectory(), ".config/littlehorse.config");
if (File.Exists(filePath))
config = new LHConfig(filePath, loggerFactory);

return config;
}

static void Main(string[] args)
{
SetupApplication();
if (_serviceProvider != null)
{
var loggerFactory = _serviceProvider.GetRequiredService<ILoggerFactory>();
var config = GetLHConfig(args, loggerFactory);

MyWorker executableCreateGreet = new MyWorker();
var taskWorkerCreate = new LHTaskWorker<MyWorker>(executableCreateGreet, "create-greet", config);
MyWorker executableUpdateGreet = new MyWorker();
var taskWorkerUpdate = new LHTaskWorker<MyWorker>(executableUpdateGreet, "update-greet", config);
MyWorker executableDeleteGreet = new MyWorker();
var taskWorkerDelete = new LHTaskWorker<MyWorker>(executableDeleteGreet, "delete-greet", config);

taskWorkerCreate.RegisterTaskDef();
taskWorkerUpdate.RegisterTaskDef();
taskWorkerDelete.RegisterTaskDef();

Thread.Sleep(1000);

taskWorkerCreate.Start();
taskWorkerUpdate.Start();
taskWorkerDelete.Start();
}
}
}
32 changes: 32 additions & 0 deletions sdk-dotnet/Examples/MaskedFieldsExample/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
## Running MaskedFields Example

This is a simple example, that masks input params and output results in LH Task Methods.

Let's run the example in `MaskedFieldsExample`

```
dotnet build
dotnet run
```

In another terminal, use `lhctl` to run the workflow:

```
# The "masked-name" variable should mask the value
# And the input-name variable value will mantain the original plain text
lhctl run example-basic masked-name pii-info input-name foo
```

In addition, you can check the result with:

```
# This call shows the result
lhctl get wfRun <wf_run_id>
# This will show you all nodes in tha run
lhctl list nodeRun <wf_run_id>
# This shows the task run information
lhctl get taskRun <wf_run_id> <task_run_global_id>
```
32 changes: 32 additions & 0 deletions sdk-dotnet/Examples/MaskedFieldsExample/Worker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using LittleHorse.Sdk.Worker;

namespace Examples.BasicExample
{
public class MyWorker
{
[LHTaskMethod("create-greet")]
[LHType(masked: true)]
public string CreateGreeting([LHType(masked: true)] string name)
{
var message = $"Hello team, This is a New Greeting for {name}";
Console.WriteLine($"Executing task create greet {name}");
return message;
}

[LHTaskMethod("update-greet")]
public string UpdateGreeting([LHType(masked: true)] string name)
{
var message = $"Hello team, This is Greeting Modification {name}";
Console.WriteLine($"Executing task update greet {name}");
return message;
}

[LHTaskMethod("delete-greet")]
public string DeleteGreeting(string name)
{
var message = $"Hello team, This is a Greeting Deletion {name}";
Console.WriteLine($"Executing task delete greet {name}");
return message;
}
}
}
6 changes: 4 additions & 2 deletions sdk-go/littlehorse/wf_lib_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package littlehorse

import (
"errors"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
"log"
"strconv"
"strings"
"unicode"

"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"

"github.com/ztrue/tracerr"
)

Expand Down Expand Up @@ -745,7 +746,8 @@ func (t *WorkflowThread) addVariable(
}

threadVarDef := &lhproto.ThreadVarDef{
VarDef: varDef,
VarDef: varDef,
AccessLevel: lhproto.WfRunVariableAccessLevel_PRIVATE_VAR,
}

t.spec.VariableDefs = append(t.spec.VariableDefs, threadVarDef)
Expand Down
13 changes: 7 additions & 6 deletions sdk-go/littlehorse/wf_lib_internal_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package littlehorse_test

import (
"testing"

"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
"testing"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -538,10 +539,10 @@ func TestJsonPath(t *testing.T) {

func TestVariableAccessLevel(t *testing.T) {
wf := littlehorse.NewWorkflow(func(t *littlehorse.WorkflowThread) {
inheritedVar := t.AddVariable("my-var", lhproto.VariableType_BOOL)
inheritedVar.WithAccessLevel(lhproto.WfRunVariableAccessLevel_PRIVATE_VAR)
publicVar := t.AddVariable("my-var", lhproto.VariableType_BOOL)
publicVar.AsPublic()

// Test that default is PUBLIC_VAR
// Test that default is PRIVATE_VAR
t.AddVariable("default-access", lhproto.VariableType_INT)

t.Execute("some-task")
Expand All @@ -550,11 +551,11 @@ func TestVariableAccessLevel(t *testing.T) {
putWf, _ := wf.Compile()
entrypoint := putWf.ThreadSpecs[putWf.EntrypointThreadName]
varDef := entrypoint.VariableDefs[0]
assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PRIVATE_VAR)
assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PUBLIC_VAR)
assert.Equal(t, varDef.VarDef.Name, "my-var")

varDef = entrypoint.VariableDefs[1]
assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PUBLIC_VAR)
assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PRIVATE_VAR)
assert.Equal(t, varDef.VarDef.Name, "default-access")
}

Expand Down
8 changes: 8 additions & 0 deletions sdk-go/littlehorse/wf_lib_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func (w *WfRunVariable) WithAccessLevel(accessLevel lhproto.WfRunVariableAccessL
return w.withAccessLevel(accessLevel)
}

func (w *WfRunVariable) AsPublic() WfRunVariable {
return w.withAccessLevel(lhproto.WfRunVariableAccessLevel_PUBLIC_VAR)
}

func (w *WfRunVariable) AsInherited() WfRunVariable {
return w.withAccessLevel(lhproto.WfRunVariableAccessLevel_INHERITED_VAR)
}

func (w *WfRunVariable) Searchable() *WfRunVariable {
return w.searchableImpl()
}
Expand Down
2 changes: 1 addition & 1 deletion sdk-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ dependencies {
implementation 'org.awaitility:awaitility:4.2.0'

// OAuth
implementation 'com.nimbusds:oauth2-oidc-sdk:10.9.2'
implementation 'com.nimbusds:oauth2-oidc-sdk:11.20.1'

// Lombok stuffs
compileOnly "org.projectlombok:lombok:${lombokVersion}"
Expand Down
30 changes: 26 additions & 4 deletions sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,36 @@ public interface WfRunVariable extends Serializable {
WfRunVariable required();

/**
* Marks the Variable as "Searchable", which:
* - Creates an Index on the Variable in the LH Data Store
* - Due to the fact that the Variable is now Searchable, all future WfSpec
* versions must use the same Type for this Variable.
* Marks the Variable as "Searchable", which creates an Index on the Variable
* in the LH Data Store.
* @return same {@link WfRunVariable} instance
*/
WfRunVariable searchable();

/**
* Marks the Variable as a `PUBLIC_VAR`, which does three things:
* 1. Considers this variable in determining whether a new version of this WfSpec
* should be a major version or minor revision.
* 2. Freezes the type of this variable so that you cannot create future WfSpec
* versions with a variable of the same name and different type.
* 3. Allows defining child WfSpec's that use this variable.
*
* This is an advanced feature that you should use in any of the following cases:
* - You are treating a WfSpec as a data model and a WfRun as an instance of data.
* - You need child workflows to access this variable.
* @return same {@link WfRunVariable} instance
*/
WfRunVariable asPublic();

/**
* Marks the Variable as a `INHERITED_VAR`, which means that it comes from the
* parent `WfRun`. This means that:
* - There must be a parent WfSpec reference.
* - The parent must have a PUBLIC_VAR variable of the same name and type.
* @return same {@link WfRunVariable} instance
*/
WfRunVariable asInherited();

/**
* Marks the JSON_OBJ or JSON_ARR Variable as "Searchable", and creates an
* index on the specified field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public WfRunVariableImpl(String name, Object typeOrDefaultVal) {
this.name = name;
this.typeOrDefaultVal = typeOrDefaultVal;

// This is the default zero value.
this.accessLevel = WfRunVariableAccessLevel.PUBLIC_VAR;
// As per GH Issue #582, the default is now PRIVATE_VAR.
this.accessLevel = WfRunVariableAccessLevel.PRIVATE_VAR;
initializeType();
}

Expand Down Expand Up @@ -122,4 +122,14 @@ public ThreadVarDef getSpec() {
.setAccessLevel(accessLevel)
.build();
}

@Override
public WfRunVariableImpl asPublic() {
return this.withAccessLevel(WfRunVariableAccessLevel.PUBLIC_VAR);
}

@Override
public WfRunVariable asInherited() {
return this.withAccessLevel(WfRunVariableAccessLevel.INHERITED_VAR);
}
}
2 changes: 1 addition & 1 deletion sdk-python/littlehorse/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from .node_run_pb2 import *
from .object_id_pb2 import *
from .scheduled_wf_run_pb2 import *
from .service_pb2 import *
from .service_pb2_grpc import *
from .service_pb2 import *
from .task_def_pb2 import *
from .task_run_pb2 import *
from .user_tasks_pb2 import *
Expand Down
25 changes: 24 additions & 1 deletion sdk-python/littlehorse/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
WfRunVariableAccessLevel,
WorkflowRetentionPolicy,
)
from littlehorse.model.wf_spec_pb2 import PRIVATE_VAR
from littlehorse.utils import negate_comparator, to_variable_value
from littlehorse.worker import _create_task_def

Expand Down Expand Up @@ -364,6 +365,28 @@ def with_json_path(self, json_path: str) -> "WfRunVariable":
out.json_path = json_path
return out

def as_public(self) -> "WfRunVariable":
"""Sets the access level to PUBLIC_VAR, which has three implications:
- Future versions of this WfSpec cannot define a variable with the
same name and a different type.
- Child workflows can access this variable.
- This variable is now considered in determining whether a new
version of the WfSpec is a majorVersion or revision.
"""
self._access_level = WfRunVariableAccessLevel.PUBLIC_VAR
return self

def as_inherited(self) -> "WfRunVariable":
"""Sets the access level to INHERITED_VAR, which has three implications:
- Future versions of this WfSpec cannot define a variable with the
same name and a different type.
- Child workflows can access this variable.
- This variable is now considered in determining whether a new
version of the WfSpec is a majorVersion or revision.
"""
self._access_level = WfRunVariableAccessLevel.INHERITED_VAR
return self

def with_access_level(
self, access_level: WfRunVariableAccessLevel
) -> "WfRunVariable":
Expand Down Expand Up @@ -1474,7 +1497,7 @@ def add_variable(
self,
variable_name: str,
variable_type: VariableType,
access_level: Optional[Union[WfRunVariableAccessLevel, str]] = None,
access_level: Optional[Union[WfRunVariableAccessLevel, str]] = PRIVATE_VAR,
default_value: Any = None,
) -> WfRunVariable:
"""Defines a Variable in the ThreadSpec and returns a handle to it.
Expand Down
Loading

0 comments on commit b1d635c

Please sign in to comment.