Skip to content

Commit

Permalink
fix: resend events with exponential backoff and wait for ACK
Browse files Browse the repository at this point in the history
Signed-off-by: Chetan Banavikalmutt <[email protected]>
  • Loading branch information
chetan-rns committed Nov 18, 2024
1 parent 338dfc9 commit ce57e29
Show file tree
Hide file tree
Showing 8 changed files with 559 additions and 96 deletions.
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Agent struct {
// At present, 'watchLock' is only acquired on calls to 'addAppUpdateToQueue'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback.
watchLock sync.RWMutex
version *version.Version

eventWriter *event.EventWriter
}

const defaultQueueName = "default"
Expand Down
45 changes: 27 additions & 18 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,8 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
return nil
}

logCtx.Tracef("Sending an item to the event stream")

pev, err := format.ToProto(ev)
if err != nil {
logCtx.Warnf("Could not wire event: %v", err)
return nil
}

err = stream.Send(&eventstreamapi.Event{Event: pev})
if err != nil {
if grpcutil.NeedReconnectOnError(err) {
return err
} else {
logCtx.Errorf("Error while sending: %v", err)
return nil
}
}
logCtx.Trace("Adding an item to the event writer", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev))
a.eventWriter.Add(ev)

return nil
}
Expand Down Expand Up @@ -130,9 +115,29 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro
return nil
}
logCtx.Debugf("Received a new event from stream")

if ev.Target() == event.TargetEventAck {
logCtx.Trace("Received an ACK for an event", "resourceID", ev.ResourceID(), "eventID", ev.EventID())
rawEvent, err := format.FromProto(rcvd.Event)
if err != nil {
return err
}
a.eventWriter.Remove(rawEvent)
logCtx.Trace("Removed an event from the EventWriter", "resourceID", ev.ResourceID(), "eventID", ev.EventID())
return nil
}

err = a.processIncomingEvent(ev)
if err != nil {
if err != nil && !event.IsEventDiscarded(err) && !event.IsEventNotAllowed(err) {
logCtx.WithError(err).Errorf("Unable to process incoming event")
} else {
// Send an ACK if the event is processed successfully.
sendQ := a.queues.SendQ(a.remote.ClientID())
if sendQ == nil {
return fmt.Errorf("no send queue found for the remote principal")
}
sendQ.Add(a.emitter.ProcessedEvent(event.EventProcessed, ev))
logCtx.Info("Sent an ACK for an event", "resourceID", ev.ResourceID(), "eventID", ev.EventID())
}
return nil
}
Expand All @@ -144,6 +149,10 @@ func (a *Agent) handleStreamEvents() error {
if err != nil {
return err
}

a.eventWriter = event.NewEventWriter(stream)
go a.eventWriter.SendWaitingEvents(a.context)

logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"server_addr": grpcutil.AddressFromContext(stream.Context()),
Expand Down
29 changes: 23 additions & 6 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)

/*
Expand Down Expand Up @@ -122,7 +123,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
// that are incoming.
if a.mode != types.AgentModeManaged {
logCtx.Trace("Discarding this event, because agent is not in managed mode")
return nil, fmt.Errorf("cannot create application: agent is not in managed mode")
return nil, event.NewEventDiscardedErr("cannot create application: agent is not in managed mode")
}

// If we receive a new app event for an app we already manage, it usually
Expand All @@ -131,7 +132,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
// TODO(jannfis): Handle this situation properly instead of throwing an error.
if a.appManager.IsManaged(incoming.QualifiedName()) {
logCtx.Trace("Discarding this event, because application is already managed on this agent")
return nil, fmt.Errorf("application %s is already managed", incoming.QualifiedName())
return nil, event.NewEventDiscardedErr("application %s is already managed", incoming.QualifiedName())
}

logCtx.Infof("Creating a new application on behalf of an incoming event")
Expand All @@ -143,6 +144,11 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
}

created, err := a.appManager.Create(a.context, incoming)
if apierrors.IsAlreadyExists(err) {
logCtx.Debug("application already exists")
return created, nil
}

return created, err
}

Expand All @@ -156,7 +162,7 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App
})
if a.appManager.IsChangeIgnored(incoming.QualifiedName(), incoming.ResourceVersion) {
logCtx.Tracef("Discarding this event, because agent has seen this version %s already", incoming.ResourceVersion)
return nil, fmt.Errorf("the version %s has already been seen by this agent", incoming.ResourceVersion)
return nil, event.NewEventDiscardedErr("the version %s has already been seen by this agent", incoming.ResourceVersion)
} else {
logCtx.Tracef("New resource version: %s", incoming.ResourceVersion)
}
Expand Down Expand Up @@ -197,6 +203,10 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {
deletionPropagation := backend.DeletePropagationBackground
err := a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation)
if err != nil {
if apierrors.IsNotFound(err) {
logCtx.Debug("application is not found, perhaps it is already deleted")
return nil
}
return err
}
err = a.appManager.Unmanage(app.QualifiedName())
Expand All @@ -219,14 +229,14 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
// that are incoming.
if a.mode != types.AgentModeManaged {
logCtx.Trace("Discarding this event, because agent is not in managed mode")
return nil, fmt.Errorf("cannot create appproject: agent is not in managed mode")
return nil, event.NewEventDiscardedErr("cannot create appproject: agent is not in managed mode")
}

// If we receive a new AppProject event for an AppProject we already manage, it usually
// means that we're out-of-sync from the control plane.
if a.appManager.IsManaged(incoming.Name) {
logCtx.Trace("Discarding this event, because AppProject is already managed on this agent")
return nil, fmt.Errorf("appproject %s is already managed", incoming.Name)
return nil, event.NewEventDiscardedErr("appproject %s is already managed", incoming.Name)
}

logCtx.Infof("Creating a new AppProject on behalf of an incoming event")
Expand All @@ -238,6 +248,9 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
}

created, err := a.projectManager.Create(a.context, incoming)
if apierrors.IsAlreadyExists(err) {
logCtx.Debug("appProject already exists")
}
return created, err
}

Expand All @@ -251,7 +264,7 @@ func (a *Agent) updateAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
})
if a.appManager.IsChangeIgnored(incoming.Name, incoming.ResourceVersion) {
logCtx.Tracef("Discarding this event, because agent has seen this version %s already", incoming.ResourceVersion)
return nil, fmt.Errorf("the version %s has already been seen by this agent", incoming.ResourceVersion)
return nil, event.NewEventDiscardedErr("the version %s has already been seen by this agent", incoming.ResourceVersion)
} else {
logCtx.Tracef("New resource version: %s", incoming.ResourceVersion)
}
Expand Down Expand Up @@ -283,6 +296,10 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error {
deletionPropagation := backend.DeletePropagationBackground
err := a.projectManager.Delete(a.context, a.namespace, project, &deletionPropagation)
if err != nil {
if apierrors.IsNotFound(err) {
logCtx.Debug("appProject not found, perhaps it is already deleted")
return nil
}
return err
}
err = a.projectManager.Unmanage(project.Name)
Expand Down
Loading

0 comments on commit ce57e29

Please sign in to comment.