Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Namespaced scopes #167

Merged
merged 33 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ffb2827
Progress towards adding function expressions to the engine
jaredoconnell Feb 5, 2024
3b498cf
Added function support
jaredoconnell Feb 7, 2024
c36109c
Fixes.
jaredoconnell Feb 7, 2024
cc2a01e
Added workflow test cases with more complex expressions
jaredoconnell Feb 9, 2024
d872890
Addressed review feedback
jaredoconnell Feb 13, 2024
41137ae
Remove unnecessary reference to the function names in descriptions
jaredoconnell Feb 13, 2024
4c7f591
More description changes
jaredoconnell Feb 13, 2024
6c6b728
Manually add special cases to prevent platform-specific behavior
jaredoconnell Feb 14, 2024
3ffb44b
Added negative zeros to test cases
jaredoconnell Feb 14, 2024
f1500e4
Address review comments
jaredoconnell Feb 14, 2024
1422625
Merge branch 'main' into function-support
jaredoconnell Feb 14, 2024
1f491cd
Change case
jaredoconnell Feb 14, 2024
ad7bbaf
Address review comments
jaredoconnell Feb 15, 2024
b422120
Address review comments
jaredoconnell Feb 16, 2024
453a458
Fix linting error and address review comment.
jaredoconnell Feb 16, 2024
b01e7d8
Change descriptions for functions
jaredoconnell Feb 16, 2024
912c1b5
Address review comment
jaredoconnell Feb 16, 2024
3b3ee56
Added namespaced scopes for step namespaces in input
jaredoconnell Feb 21, 2024
05c45a2
Use default namespace constant
jaredoconnell Feb 22, 2024
ab751ad
Apply scopes from sub-workflows
jaredoconnell Feb 22, 2024
6cbcd4e
Merge branch 'main' into namespaced-scopes
jaredoconnell Mar 14, 2024
45cc29a
Update invalid serialization detector for changes in SDK
jaredoconnell Mar 14, 2024
f8480a8
Fix linting errors
jaredoconnell Mar 14, 2024
5fb8e94
Improved code and added initial tests for namespaced scopes
jaredoconnell Mar 22, 2024
52ede8c
Added unit tests for new ApplyScope functions
jaredoconnell Mar 29, 2024
1bd2e22
Added integration tests for namespaced scoped workflows
jaredoconnell Apr 3, 2024
f2875cf
Added namespaces for referenced items in scopes
jaredoconnell Apr 5, 2024
5b6d279
Merge branch 'main' into namespaced-scopes
jaredoconnell Apr 5, 2024
e5adf71
Upgrade to go SDK pre-release
jaredoconnell Apr 11, 2024
77c91db
Merge branch 'main' into namespaced-scopes
jaredoconnell Apr 12, 2024
e0555d8
Addressed review comments
jaredoconnell Apr 16, 2024
3dd60af
Clarify stage IDs in comment examples
jaredoconnell Apr 16, 2024
3cf14b6
Addressed review comments
jaredoconnell Apr 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
go.flow.arcalot.io/dockerdeployer v0.6.1
go.flow.arcalot.io/expressions v0.4.1
go.flow.arcalot.io/kubernetesdeployer v0.9.1
go.flow.arcalot.io/pluginsdk v0.9.0-beta1
go.flow.arcalot.io/pluginsdk v0.10.0-beta1
go.flow.arcalot.io/podmandeployer v0.9.0
go.flow.arcalot.io/pythondeployer v0.6.0
go.flow.arcalot.io/testdeployer v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ go.flow.arcalot.io/expressions v0.4.1 h1:WOl3DtDcWAmPKupwYxJV3bVYKPoMgAmQbECfiUg
go.flow.arcalot.io/expressions v0.4.1/go.mod h1:FA/50wX1+0iTgW/dFeeE1yOslZSmfBaMNR4IiMYRwxc=
go.flow.arcalot.io/kubernetesdeployer v0.9.1 h1:AGnJFazehAENXxGMCF0Uc7aG9F0LpvuhoyQFu8deJG0=
go.flow.arcalot.io/kubernetesdeployer v0.9.1/go.mod h1:yvxT3VwmyrlIi4422pxl02z4QeU2Gvbjg5aQB17Ye4s=
go.flow.arcalot.io/pluginsdk v0.9.0-beta1 h1:tJwEp92vRJldHMff29Q8vfQB5a7FHe/nn6vyFTC1sik=
go.flow.arcalot.io/pluginsdk v0.9.0-beta1/go.mod h1:7HafTRTFTYRbJ4sS/Vn0CFrHlaBpEoyOX4oNf612XJM=
go.flow.arcalot.io/pluginsdk v0.10.0-beta1 h1:VGhGIJTZiAfb9J9NZGgMPFO4wb2i94PEiZ2e+ZkNz4A=
go.flow.arcalot.io/pluginsdk v0.10.0-beta1/go.mod h1:7HafTRTFTYRbJ4sS/Vn0CFrHlaBpEoyOX4oNf612XJM=
go.flow.arcalot.io/podmandeployer v0.9.0 h1:BSN/s8BeEUIIqOQm6/I5LOzAUzYFrVO1/3p1hcbWD4g=
go.flow.arcalot.io/podmandeployer v0.9.0/go.mod h1:FWwelCoH0jfQEYQAJ5mzLElZIXlRfyLP4osjcuQ6n30=
go.flow.arcalot.io/pythondeployer v0.6.0 h1:ptAurEJ2u2U127nK6Kk7zTelbkk6ipPqZcwnTmqB9vo=
Expand Down
14 changes: 10 additions & 4 deletions internal/util/invalid_serialization_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (d *InvalidSerializationDetectorSchema) UnserializeType(data any) (string,

// ValidateCompatibility ensures that the input data or schema is compatible with
// the given InvalidSerializationDetectorSchema.
func (d InvalidSerializationDetectorSchema) ValidateCompatibility(_ any) error {
func (d *InvalidSerializationDetectorSchema) ValidateCompatibility(_ any) error {
// For convenience, always return "success".
return nil
}
Expand Down Expand Up @@ -90,15 +90,21 @@ func (d *InvalidSerializationDetectorSchema) SerializeType(data string) (any, er
}

// ApplyScope is for applying a scope to the references. Does not apply to this object.
func (d InvalidSerializationDetectorSchema) ApplyScope(_ schema.Scope) {}
func (d *InvalidSerializationDetectorSchema) ApplyScope(_ schema.Scope, _ string) {}

// ValidateReferences validates that all necessary references from scopes have been applied.
// Does not apply to this object.
func (d *InvalidSerializationDetectorSchema) ValidateReferences() error {
return nil
}
dbutenhof marked this conversation as resolved.
Show resolved Hide resolved

// TypeID returns the category of type this type is. Returns string because
// the valid states of this type include all strings.
func (d InvalidSerializationDetectorSchema) TypeID() schema.TypeID {
func (d *InvalidSerializationDetectorSchema) TypeID() schema.TypeID {
return schema.TypeIDString // This is a subset of a string schema.
}

// ReflectedType returns the reflect.Type for a string.
func (d InvalidSerializationDetectorSchema) ReflectedType() reflect.Type {
func (d *InvalidSerializationDetectorSchema) ReflectedType() reflect.Type {
return reflect.TypeOf("")
}
8 changes: 6 additions & 2 deletions workflow/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ func (a *anySchemaWithExpressions) Serialize(data any) (any, error) {
return a.checkAndConvert(data)
}

func (a *anySchemaWithExpressions) ApplyScope(scope schema.Scope) {
a.anySchema.ApplyScope(scope)
func (a *anySchemaWithExpressions) ApplyScope(scope schema.Scope, namespace string) {
a.anySchema.ApplyScope(scope, namespace)
}

func (a *anySchemaWithExpressions) ValidateReferences() error {
return a.anySchema.ValidateReferences()
}

func (a *anySchemaWithExpressions) TypeID() schema.TypeID {
Expand Down
92 changes: 91 additions & 1 deletion workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte
return nil, err
}

// Apply step lifecycle objects to the input scope
err = applyLifecycleScopes(stepLifecycles, typedInput)
if err != nil {
return nil, err
}

// Stage 3: Construct an internal data model for the output data model
// provided by the steps. This is the schema the expressions evaluate
// against. You can use this to do static code analysis on the expressions.
Expand Down Expand Up @@ -221,7 +227,7 @@ func (e *executor) processInput(workflow *Workflow) (schema.Scope, error) {
if !ok {
return nil, fmt.Errorf("bug: unserialized input is not a scope")
}
typedInput.ApplyScope(typedInput)
typedInput.ApplyScope(typedInput, schema.DEFAULT_NAMESPACE)
return typedInput, nil
}

Expand Down Expand Up @@ -307,6 +313,90 @@ func (e *executor) processSteps(
return runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, nil
}

func applyLifecycleScopes(
stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema],
typedInput schema.Scope,
) error {
allNamespaces := make(map[string]schema.Scope)
for workflowStepID, stepLifecycle := range stepLifecycles {
// The format will be $.steps.step_name.stage.<input|outputs.output_id>
// First, get the input schema and output schemas, and apply them.
webbnh marked this conversation as resolved.
Show resolved Hide resolved
for _, stage := range stepLifecycle.Stages {
prefix := "$.steps." + workflowStepID + "." + stage.ID + "."
// Apply inputs
// Example: $.steps.wait_step.starting.input
addInputNamespacedScopes(allNamespaces, stage, prefix+"inputs.")
// Apply outputs
// Example: $.steps.wait_step.outputs.success
addOutputNamespacedScopes(allNamespaces, stage, prefix+"outputs.")
webbnh marked this conversation as resolved.
Show resolved Hide resolved
}
}
return applyAllNamespaces(allNamespaces, typedInput)
}

func applyAllNamespaces(allNamespaces map[string]schema.Scope, scopeToApplyTo schema.Scope) error {
// Just apply all scopes
for namespace, scope := range allNamespaces {
scopeToApplyTo.ApplyScope(scope, namespace)
}
// Validate references. If that fails, provide a useful error message to the user.
webbnh marked this conversation as resolved.
Show resolved Hide resolved
err := scopeToApplyTo.ValidateReferences()
if err == nil {
return nil // Success
}
// Without listing the namespaces, it may be hard to debug a workflow, so add that list to the error.
availableObjects := ""
for namespace, scope := range allNamespaces {
availableObjects += "\n" + namespace + ":"
for objectID := range scope.Objects() {
availableObjects += " " + objectID
}
}
return fmt.Errorf("error validating references for workflow input (%w)\nAvailable namespaces and objects:%s", err, availableObjects)
webbnh marked this conversation as resolved.
Show resolved Hide resolved
}

func addOutputNamespacedScopes(allNamespaces map[string]schema.Scope, stage step.LifecycleStageWithSchema, prefix string) {
for outputID, outputSchema := range stage.Outputs {
addScopesWithReferences(allNamespaces, outputSchema.Schema(), prefix+outputID)
}
}

func addInputNamespacedScopes(allNamespaces map[string]schema.Scope, stage step.LifecycleStageWithSchema, prefix string) {
for inputID, inputSchema := range stage.InputSchema {
switch inputSchema.TypeID() {
case schema.TypeIDScope:
addScopesWithReferences(allNamespaces, inputSchema.Type().(schema.Scope), prefix+inputID)
case schema.TypeIDList:
// foreach is a list, for example.
listSchema := inputSchema.Type().(schema.UntypedList)
if listSchema.Items().TypeID() == schema.TypeIDScope {
// Apply list item type since it's a scope.
itemScope := listSchema.Items().(schema.Scope)
addScopesWithReferences(allNamespaces, itemScope, prefix+inputID)
}
}
webbnh marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Adds the scope to the namespace map, as well as all resolved references from external namespaces.
func addScopesWithReferences(allNamespaces map[string]schema.Scope, scope schema.Scope, prefix string) {
// First, just adds the scope
allNamespaces[prefix] = scope
// Next, checks all properties for resolved references that reference objects outside of this scope.
rootObject := scope.Objects()[scope.Root()]
for propertyID, property := range rootObject.Properties() {
if property.Type().TypeID() == schema.TypeIDRef {
refProperty := property.Type().(schema.Ref)
if refProperty.Namespace() != schema.DEFAULT_NAMESPACE && refProperty.ObjectReady() {
// Found a resolved reference with an object that is not included in the scope. Add it to the map.
webbnh marked this conversation as resolved.
Show resolved Hide resolved
var referencedObject any = refProperty.GetObject()
refObjectSchema := referencedObject.(*schema.ObjectSchema)
allNamespaces[prefix+"."+propertyID] = schema.NewScopeSchema(refObjectSchema)
}
}
}
}

// connectStepDependencies connects the steps based on their expressions.
func (e *executor) connectStepDependencies(
workflow *Workflow,
Expand Down
Loading