Skip to content

Commit

Permalink
controller: reconcile func returns an error
Browse files Browse the repository at this point in the history
Makes possible to abort reconciliation workflows by making a precondition task or (alternatively any of the concurrent tasks to return an error.

Signed-off-by: Guilherme Cassolato <[email protected]>
  • Loading branch information
guicassolato committed Sep 10, 2024
1 parent 0e50bb8 commit 350512b
Show file tree
Hide file tree
Showing 16 changed files with 234 additions and 28 deletions.
10 changes: 7 additions & 3 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func WithRunnable(name string, builder RunnableBuilder) ControllerOption {
// It receives a list of recent events, an immutable copy of the topology as known by the caller after the events,
// an optional error detected before the reconciliation, and a thread-safe map to store transient state across
// chained calls to multiple ReconcileFuncs.
type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map)
// If a ReconcileFunc returns an error, a chained sequence of ReconcileFuncs must be interrupted.
type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error

func WithReconcile(reconcile ReconcileFunc) ControllerOption {
return func(o *ControllerOptions) {
Expand Down Expand Up @@ -110,7 +111,8 @@ func NewController(f ...ControllerOption) *Controller {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error {
return nil
},
}
for _, fn := range f {
Expand Down Expand Up @@ -254,7 +256,9 @@ func (c *Controller) propagate(resourceEvents []ResourceEvent) {
if err != nil {
c.logger.Error(err, "error building topology")
}
c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err, &sync.Map{})
if err := c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err, &sync.Map{}); err != nil {
c.logger.Error(err, "reconciliation error")
}
}

func (c *Controller) subscribe() {
Expand Down
3 changes: 2 additions & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestControllerOptions(t *testing.T) {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error {
return nil
},
}

Expand Down
5 changes: 3 additions & 2 deletions controller/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Subscription struct {
Events []ResourceEventMatcher
}

func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) error {
matchingEvents := lo.Filter(resourceEvents, func(resourceEvent ResourceEvent, _ int) bool {
return lo.ContainsBy(s.Events, func(m ResourceEventMatcher) bool {
obj := resourceEvent.OldObject
Expand All @@ -31,6 +31,7 @@ func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEv
})
})
if len(matchingEvents) > 0 && s.ReconcileFunc != nil {
s.ReconcileFunc(ctx, matchingEvents, topology, err, state)
return s.ReconcileFunc(ctx, matchingEvents, topology, err, state)
}
return nil
}
3 changes: 2 additions & 1 deletion controller/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func init() {
Func: func(_ machinery.Object) []machinery.Object { return []machinery.Object{&RuntimeObject{myObjects[0]}} },
}
}
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) error {
for _, event := range events {
testLogger.Info("reconcile",
"kind", event.Kind,
Expand All @@ -62,6 +62,7 @@ func init() {
"objects", len(topology.Objects().Items()),
)
}
return nil
}
testScheme = runtime.NewScheme()
corev1.AddToScheme(testScheme)
Expand Down
31 changes: 19 additions & 12 deletions controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

"golang.org/x/sync/errgroup"

"github.com/kuadrant/policy-machinery/machinery"
)

Expand All @@ -15,26 +17,31 @@ type Workflow struct {
Postcondition ReconcileFunc
}

func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) error {
// run precondition reconcile function
if d.Precondition != nil {
d.Precondition(ctx, resourceEvents, topology, err, state)
if err := d.Precondition(ctx, resourceEvents, topology, err, state); err != nil {
return err
}
}

// dispatch the event to concurrent tasks
funcs := d.Tasks
waitGroup := &sync.WaitGroup{}
waitGroup.Add(len(funcs))
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvents, topology, err, state)
}()
g, groupCtx := errgroup.WithContext(ctx)
for _, f := range d.Tasks {
g.Go(func() error {
return f(groupCtx, resourceEvents, topology, err, state)
})
}
if err := g.Wait(); err != nil {
return err
}
waitGroup.Wait()

// run precondition reconcile function
if d.Postcondition != nil {
d.Postcondition(ctx, resourceEvents, topology, err, state)
if err := d.Postcondition(ctx, resourceEvents, topology, err, state); err != nil {
return err
}
}

return nil
}
174 changes: 174 additions & 0 deletions controller/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package controller

import (
"context"
"fmt"
"strings"
"sync"
"testing"

"github.com/samber/lo"

"github.com/kuadrant/policy-machinery/machinery"
)

func TestWorkflow(t *testing.T) {

reconcileFuncFor := func(flag *bool, err error) ReconcileFunc {
return func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error {
*flag = true
return err
}
}

var preconditionCalled, task1Called, task2Called, postconditionCalled bool

precondition := reconcileFuncFor(&preconditionCalled, nil)
preconditionWithError := reconcileFuncFor(&preconditionCalled, fmt.Errorf("precondition error"))
task1 := reconcileFuncFor(&task1Called, nil)
task1WithError := reconcileFuncFor(&task1Called, fmt.Errorf("task1 error"))
task2 := reconcileFuncFor(&task2Called, nil)
task2WithError := reconcileFuncFor(&task2Called, fmt.Errorf("task2 error"))
postcondition := reconcileFuncFor(&postconditionCalled, nil)
postconditionWithError := reconcileFuncFor(&postconditionCalled, fmt.Errorf("postcondition error"))

testCases := []struct {
name string
workflow *Workflow
expectedPreconditionCalled bool
expectedTask1Called bool
expectedTask2Called bool
expectedPostconditionCalled bool
possibleErrs []error
}{
{
name: "empty workflow",
workflow: &Workflow{},
},
{
name: "precondition",
workflow: &Workflow{
Precondition: precondition,
},
expectedPreconditionCalled: true,
},
{
name: "precondition and tasks",
workflow: &Workflow{
Precondition: precondition,
Tasks: []ReconcileFunc{task1, task2},
},
expectedPreconditionCalled: true,
expectedTask1Called: true,
expectedTask2Called: true,
},
{
name: "precondition with error",
workflow: &Workflow{
Precondition: preconditionWithError,
Tasks: []ReconcileFunc{task1, task2},
},
expectedPreconditionCalled: true,
expectedTask1Called: false,
expectedTask2Called: false,
possibleErrs: []error{fmt.Errorf("precondition error")},
},
{
name: "task1 with error",
workflow: &Workflow{
Tasks: []ReconcileFunc{task1WithError, task2},
Postcondition: postcondition,
},
expectedTask1Called: true,
expectedTask2Called: true,
expectedPreconditionCalled: false,
possibleErrs: []error{fmt.Errorf("task1 error")},
},
{
name: "task2 with error",
workflow: &Workflow{
Tasks: []ReconcileFunc{task1, task2WithError},
Postcondition: postcondition,
},
expectedTask1Called: true,
expectedTask2Called: true,
expectedPreconditionCalled: false,
possibleErrs: []error{fmt.Errorf("task2 error")},
},
{
name: "task1 and task2 with error",
workflow: &Workflow{
Tasks: []ReconcileFunc{task1WithError, task2WithError},
Postcondition: postcondition,
},
expectedTask1Called: true,
expectedTask2Called: true,
expectedPreconditionCalled: false,
possibleErrs: []error{
fmt.Errorf("task1 error"),
fmt.Errorf("task2 error"),
},
},
{
name: "postcondition",
workflow: &Workflow{
Precondition: precondition,
Tasks: []ReconcileFunc{task1, task2},
Postcondition: postcondition,
},
expectedPreconditionCalled: true,
expectedTask1Called: true,
expectedTask2Called: true,
expectedPostconditionCalled: true,
},
{
name: "postconditions with error",
workflow: &Workflow{
Precondition: precondition,
Tasks: []ReconcileFunc{task1, task2},
Postcondition: postconditionWithError,
},
expectedPreconditionCalled: true,
expectedTask1Called: true,
expectedTask2Called: true,
expectedPostconditionCalled: true,
possibleErrs: []error{fmt.Errorf("postcondition error")},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// reset
preconditionCalled = false
task1Called = false
task2Called = false
postconditionCalled = false

err := tc.workflow.Run(context.Background(), nil, nil, nil, nil)
possibleErrs := lo.Map(tc.possibleErrs, func(err error, _ int) string { return err.Error() })

if tc.expectedPreconditionCalled != preconditionCalled {
t.Errorf("expected precondition to be called: %t, got %t", tc.expectedPreconditionCalled, preconditionCalled)
}
if tc.expectedTask1Called != task1Called {
t.Errorf("expected task1 to be called: %t, got %t", tc.expectedTask1Called, task1Called)
}
if tc.expectedTask2Called != task2Called {
t.Errorf("expected task2 to be called: %t, got %t", tc.expectedTask2Called, task2Called)
}
if tc.expectedPostconditionCalled != postconditionCalled {
t.Errorf("expected postcondition to be called: %t, got %t", tc.expectedPostconditionCalled, postconditionCalled)
}
if len(possibleErrs) > 0 && err == nil {
t.Errorf("expected one of the following errors (%v), got nil", strings.Join(possibleErrs, " / "))
}
if len(possibleErrs) == 0 && err != nil {
t.Errorf("expected no error, got %v", err)
}
if len(possibleErrs) > 0 && err != nil && !lo.ContainsBy(possibleErrs, func(possibleErr string) bool { return possibleErr == err.Error() }) {
t.Errorf("expected error of the following errors (%v), got %v", strings.Join(possibleErrs, " / "), err)
}
})
}

}
1 change: 1 addition & 0 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
3 changes: 2 additions & 1 deletion examples/kuadrant/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c
}

reconciler := &controller.Workflow{
Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) error {
logger := controller.LoggerFromContext(ctx).WithName("event logger")
for _, event := range resourceEvents {
// log the event
Expand All @@ -263,6 +263,7 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c
}
logger.Info("new event", values...)
}
return nil
},
Tasks: []controller.ReconcileFunc{
(&reconcilers.TopologyFileReconciler{}).Reconcile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

const authPathsKey = "authPaths"

func ReconcileEffectivePolicies(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
func ReconcileEffectivePolicies(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) error {
targetables := topology.Targetables()

// reconcile policies
Expand Down Expand Up @@ -70,6 +70,8 @@ func ReconcileEffectivePolicies(ctx context.Context, resourceEvents []controller
}

state.Store(authPathsKey, authPaths)

return nil
}

func effectivePolicyForPath[T machinery.Policy](ctx context.Context, path []machinery.Targetable) *T {
Expand Down
6 changes: 4 additions & 2 deletions examples/kuadrant/reconcilers/envoy_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type EnvoyGatewayProvider struct {
Client *dynamic.DynamicClient
}

func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) error {
logger := controller.LoggerFromContext(ctx).WithName("envoy gateway").WithName("securitypolicy")
ctx = controller.LoggerIntoContext(ctx, logger)

Expand Down Expand Up @@ -59,13 +59,15 @@ func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _
}
p.deleteSecurityPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), gateway)
}
return nil
}

func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) error {
for _, resourceEvent := range resourceEvents {
gateway := resourceEvent.OldObject
p.deleteSecurityPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil)
}
return nil
}

func (p *EnvoyGatewayProvider) createSecurityPolicy(ctx context.Context, topology *machinery.Topology, gateway machinery.Targetable) {
Expand Down
Loading

0 comments on commit 350512b

Please sign in to comment.