Skip to content

Commit

Permalink
fix: race condition while collecting runtime events (#3917)
Browse files Browse the repository at this point in the history
Fixes a race condition where multiple go routines would try and append
to the same slice concurrently.
We would end up losing some RuntimeEvents.

The fix involves publishing to a channel, aggregating the events and
then marking the step as complete.

I only started seeing the bug when pubsub provisioning began completing
multiple steps in quick succession:
#3916
  • Loading branch information
matt2e authored Jan 7, 2025
1 parent 81cb790 commit 0fbebeb
Showing 1 changed file with 40 additions and 16 deletions.
56 changes: 40 additions & 16 deletions backend/provisioner/inmem_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"connectrpc.com/connect"
"github.com/alecthomas/atomic"
"github.com/alecthomas/types/optional"
"github.com/google/uuid"
"github.com/puzpuzpuz/xsync/v3"

Expand All @@ -15,6 +16,7 @@ import (
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/log"
)

Expand Down Expand Up @@ -75,6 +77,11 @@ func (d *InMemProvisioner) Ping(context.Context, *connect.Request[ftlv1.PingRequ
return &connect.Response[ftlv1.PingResponse]{}, nil
}

type stepCompletedEvent struct {
step *inMemProvisioningStep
event optional.Option[*RuntimeEvent]
}

func (d *InMemProvisioner) Provision(ctx context.Context, req *connect.Request[provisioner.ProvisionRequest]) (*connect.Response[provisioner.ProvisionResponse], error) {
logger := log.FromContext(ctx)

Expand All @@ -95,36 +102,53 @@ func (d *InMemProvisioner) Provision(ctx context.Context, req *connect.Request[p
desiredNodes := schema.GetProvisioned(desiredModule)

task := &inMemProvisioningTask{}
// use chans to safely collect all events before completing each task
completions := make(chan stepCompletedEvent, 16)

for id, desired := range desiredNodes {
previous, ok := previousNodes[id]

for _, resource := range desired.GetProvisioned() {
if !ok || !resource.IsEqual(previous.GetProvisioned().Get(resource.Kind)) {
if slices.Contains(kinds, resource.Kind) {
if handler, ok := d.handlers[resource.Kind]; ok {
step := &inMemProvisioningStep{Done: atomic.New(false)}
task.steps = append(task.steps, step)
go func() {
defer step.Done.Store(true)
event, err := handler(ctx, desiredModule.Name, desired)
if err != nil {
step.Err = err
logger.Errorf(err, "failed to provision resource %s:%s", resource.Kind, desired.ResourceID())
return
}
if event != nil {
task.events = append(task.events, event)
}
}()
} else {
handler, ok := d.handlers[resource.Kind]
if !ok {
err := fmt.Errorf("unsupported resource type: %s", resource.Kind)
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
step := &inMemProvisioningStep{Done: atomic.New(false)}
task.steps = append(task.steps, step)
go func() {
event, err := handler(ctx, desiredModule.Name, desired)
if err != nil {
step.Err = err
logger.Errorf(err, "failed to provision resource %s:%s", resource.Kind, desired.ResourceID())
completions <- stepCompletedEvent{step: step}
return
}
completions <- stepCompletedEvent{
step: step,
event: optional.From(event, event != nil),
}
}()
}
}
}
}

go func() {
for c := range channels.IterContext(ctx, completions) {
if e, ok := c.event.Get(); ok {
task.events = append(task.events, e)
}
c.step.Done.Store(true)
done, err := task.Done()
if done || err != nil {
return
}
}
}()

token := uuid.New().String()
logger.Debugf("started a task with token %s", token)
d.running.Store(token, task)
Expand Down

0 comments on commit 0fbebeb

Please sign in to comment.