Skip to content

Commit

Permalink
Namespaced scopes (#167)
Browse files Browse the repository at this point in the history
* Progress towards adding function expressions to the engine

* Added function support

* Fixes.

* Added workflow test cases with more complex expressions

* Addressed review feedback

* Remove unnecessary reference to the function names in descriptions

* More description changes

* Manually add special cases to prevent platform-specific behavior

* Added negative zeros to test cases

* Address review comments

* Change case

* Address review comments

* Address review comments

* Fix linting error and address review comment.

* Change descriptions for functions

* Address review comment

* Added namespaced scopes for step namespaces in input

* Use default namespace constant

* Apply scopes from sub-workflows

* Update invalid serialization detector for changes in SDK

* Fix linting errors

* Improved code and added initial tests for namespaced scopes

* Added unit tests for new ApplyScope functions

* Added integration tests for namespaced scoped workflows

* Added namespaces for referenced items in scopes

* Upgrade to go SDK pre-release

* Addressed review comments

* Clarify stage IDs in comment examples

* Addressed review comments
  • Loading branch information
jaredoconnell authored Apr 17, 2024
1 parent eccf155 commit 1a8ccf7
Show file tree
Hide file tree
Showing 7 changed files with 732 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
go.flow.arcalot.io/dockerdeployer v0.6.1
go.flow.arcalot.io/expressions v0.4.1
go.flow.arcalot.io/kubernetesdeployer v0.9.1
go.flow.arcalot.io/pluginsdk v0.9.0-beta1
go.flow.arcalot.io/pluginsdk v0.10.0-beta1
go.flow.arcalot.io/podmandeployer v0.9.0
go.flow.arcalot.io/pythondeployer v0.6.0
go.flow.arcalot.io/testdeployer v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ go.flow.arcalot.io/expressions v0.4.1 h1:WOl3DtDcWAmPKupwYxJV3bVYKPoMgAmQbECfiUg
go.flow.arcalot.io/expressions v0.4.1/go.mod h1:FA/50wX1+0iTgW/dFeeE1yOslZSmfBaMNR4IiMYRwxc=
go.flow.arcalot.io/kubernetesdeployer v0.9.1 h1:AGnJFazehAENXxGMCF0Uc7aG9F0LpvuhoyQFu8deJG0=
go.flow.arcalot.io/kubernetesdeployer v0.9.1/go.mod h1:yvxT3VwmyrlIi4422pxl02z4QeU2Gvbjg5aQB17Ye4s=
go.flow.arcalot.io/pluginsdk v0.9.0-beta1 h1:tJwEp92vRJldHMff29Q8vfQB5a7FHe/nn6vyFTC1sik=
go.flow.arcalot.io/pluginsdk v0.9.0-beta1/go.mod h1:7HafTRTFTYRbJ4sS/Vn0CFrHlaBpEoyOX4oNf612XJM=
go.flow.arcalot.io/pluginsdk v0.10.0-beta1 h1:VGhGIJTZiAfb9J9NZGgMPFO4wb2i94PEiZ2e+ZkNz4A=
go.flow.arcalot.io/pluginsdk v0.10.0-beta1/go.mod h1:7HafTRTFTYRbJ4sS/Vn0CFrHlaBpEoyOX4oNf612XJM=
go.flow.arcalot.io/podmandeployer v0.9.0 h1:BSN/s8BeEUIIqOQm6/I5LOzAUzYFrVO1/3p1hcbWD4g=
go.flow.arcalot.io/podmandeployer v0.9.0/go.mod h1:FWwelCoH0jfQEYQAJ5mzLElZIXlRfyLP4osjcuQ6n30=
go.flow.arcalot.io/pythondeployer v0.6.0 h1:ptAurEJ2u2U127nK6Kk7zTelbkk6ipPqZcwnTmqB9vo=
Expand Down
14 changes: 10 additions & 4 deletions internal/util/invalid_serialization_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (d *InvalidSerializationDetectorSchema) UnserializeType(data any) (string,

// ValidateCompatibility ensures that the input data or schema is compatible with
// the given InvalidSerializationDetectorSchema.
func (d InvalidSerializationDetectorSchema) ValidateCompatibility(_ any) error {
func (d *InvalidSerializationDetectorSchema) ValidateCompatibility(_ any) error {
// For convenience, always return "success".
return nil
}
Expand Down Expand Up @@ -90,15 +90,21 @@ func (d *InvalidSerializationDetectorSchema) SerializeType(data string) (any, er
}

// ApplyScope is for applying a scope to the references. Does not apply to this object.
func (d InvalidSerializationDetectorSchema) ApplyScope(_ schema.Scope) {}
func (d *InvalidSerializationDetectorSchema) ApplyScope(_ schema.Scope, _ string) {}

// ValidateReferences validates that all necessary references from scopes have been applied.
// Does not apply to this object.
func (d *InvalidSerializationDetectorSchema) ValidateReferences() error {
return nil
}

// TypeID returns the category of type this type is. Returns string because
// the valid states of this type include all strings.
func (d InvalidSerializationDetectorSchema) TypeID() schema.TypeID {
func (d *InvalidSerializationDetectorSchema) TypeID() schema.TypeID {
return schema.TypeIDString // This is a subset of a string schema.
}

// ReflectedType returns the reflect.Type for a string.
func (d InvalidSerializationDetectorSchema) ReflectedType() reflect.Type {
func (d *InvalidSerializationDetectorSchema) ReflectedType() reflect.Type {
return reflect.TypeOf("")
}
8 changes: 6 additions & 2 deletions workflow/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ func (a *anySchemaWithExpressions) Serialize(data any) (any, error) {
return a.checkAndConvert(data)
}

func (a *anySchemaWithExpressions) ApplyScope(scope schema.Scope) {
a.anySchema.ApplyScope(scope)
func (a *anySchemaWithExpressions) ApplyScope(scope schema.Scope, namespace string) {
a.anySchema.ApplyScope(scope, namespace)
}

func (a *anySchemaWithExpressions) ValidateReferences() error {
return a.anySchema.ValidateReferences()
}

func (a *anySchemaWithExpressions) TypeID() schema.TypeID {
Expand Down
92 changes: 91 additions & 1 deletion workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte
return nil, err
}

// Apply step lifecycle objects to the input scope
err = applyLifecycleScopes(stepLifecycles, typedInput)
if err != nil {
return nil, err
}

// Stage 3: Construct an internal data model for the output data model
// provided by the steps. This is the schema the expressions evaluate
// against. You can use this to do static code analysis on the expressions.
Expand Down Expand Up @@ -221,7 +227,7 @@ func (e *executor) processInput(workflow *Workflow) (schema.Scope, error) {
if !ok {
return nil, fmt.Errorf("bug: unserialized input is not a scope")
}
typedInput.ApplyScope(typedInput)
typedInput.ApplyScope(typedInput, schema.DEFAULT_NAMESPACE)
return typedInput, nil
}

Expand Down Expand Up @@ -307,6 +313,90 @@ func (e *executor) processSteps(
return runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, nil
}

func applyLifecycleScopes(
stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema],
typedInput schema.Scope,
) error {
allNamespaces := make(map[string]schema.Scope)
for workflowStepID, stepLifecycle := range stepLifecycles {
for _, stage := range stepLifecycle.Stages {
prefix := "$.steps." + workflowStepID + "." + stage.ID + "."
// Apply inputs
// Example with stage "starting": $.steps.wait_step.starting.inputs.
addInputNamespacedScopes(allNamespaces, stage, prefix+"inputs.")
// Apply outputs
// Example with stage "outputs": $.steps.wait_step.outputs.outputs.
addOutputNamespacedScopes(allNamespaces, stage, prefix+"outputs.")
}
}
return applyAllNamespaces(allNamespaces, typedInput)
}

// applyAllNamespaces applies all namespaces to the given scope.
// This function also validates references, and lists the namespaces
// and their objects in the event of a reference failure.
func applyAllNamespaces(allNamespaces map[string]schema.Scope, scopeToApplyTo schema.Scope) error {
for namespace, scope := range allNamespaces {
scopeToApplyTo.ApplyScope(scope, namespace)
}
err := scopeToApplyTo.ValidateReferences()
if err == nil {
return nil // Success
}
// Now on the error path. Provide useful debug info.
availableObjects := ""
for namespace, scope := range allNamespaces {
availableObjects += "\n\t" + namespace + ":"
for objectID := range scope.Objects() {
availableObjects += " " + objectID
}
}
availableObjects += "\n" // Since this is a multi-line error message, ending with a newline is clearer.
return fmt.Errorf(
"error validating references for workflow input (%w)\nAvailable namespaces and objects:%s",
err,
availableObjects,
)
}

func addOutputNamespacedScopes(allNamespaces map[string]schema.Scope, stage step.LifecycleStageWithSchema, prefix string) {
for outputID, outputSchema := range stage.Outputs {
addScopesWithReferences(allNamespaces, outputSchema.Schema(), prefix+outputID)
}
}

func addInputNamespacedScopes(allNamespaces map[string]schema.Scope, stage step.LifecycleStageWithSchema, prefix string) {
for inputID, inputSchemaProperty := range stage.InputSchema {
var inputSchemaType = inputSchemaProperty.Type()
// Extract item values from lists (like for ForEach)
if inputSchemaType.TypeID() == schema.TypeIDList {
inputSchemaType = inputSchemaType.(schema.UntypedList).Items()
}
if inputSchemaType.TypeID() == schema.TypeIDScope {
addScopesWithReferences(allNamespaces, inputSchemaType.(schema.Scope), prefix+inputID)
}
}
}

// Adds the scope to the namespace map, as well as all resolved references from external namespaces.
func addScopesWithReferences(allNamespaces map[string]schema.Scope, scope schema.Scope, prefix string) {
// First, just adds the scope
allNamespaces[prefix] = scope
// Next, checks all properties for resolved references that reference objects outside of this scope.
rootObject := scope.Objects()[scope.Root()]
for propertyID, property := range rootObject.Properties() {
if property.Type().TypeID() == schema.TypeIDRef {
refProperty := property.Type().(schema.Ref)
if refProperty.Namespace() != schema.DEFAULT_NAMESPACE {
// Found a reference to an object that is not included in the scope. Add it to the map.
var referencedObject any = refProperty.GetObject()
refObjectSchema := referencedObject.(*schema.ObjectSchema)
allNamespaces[prefix+"."+propertyID] = schema.NewScopeSchema(refObjectSchema)
}
}
}
}

// connectStepDependencies connects the steps based on their expressions.
func (e *executor) connectStepDependencies(
workflow *Workflow,
Expand Down
Loading

0 comments on commit 1a8ccf7

Please sign in to comment.