From ce57e29efb899999bed583b850d083e6f3666849 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Mon, 18 Nov 2024 18:35:41 +0530 Subject: [PATCH] fix: resend events with exponential backoff and wait for ACK Signed-off-by: Chetan Banavikalmutt --- agent/agent.go | 2 + agent/connection.go | 45 ++-- agent/inbound.go | 29 ++- internal/event/event.go | 253 ++++++++++++++++++++++ internal/event/event_test.go | 156 +++++++++++++ principal/apis/eventstream/eventstream.go | 39 ++-- principal/event.go | 94 ++++---- principal/event_test.go | 37 ++-- 8 files changed, 559 insertions(+), 96 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 6b14ecb..5b68695 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -64,6 +64,8 @@ type Agent struct { // At present, 'watchLock' is only acquired on calls to 'addAppUpdateToQueue'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback. watchLock sync.RWMutex version *version.Version + + eventWriter *event.EventWriter } const defaultQueueName = "default" diff --git a/agent/connection.go b/agent/connection.go index 3abf675..34b73a9 100644 --- a/agent/connection.go +++ b/agent/connection.go @@ -86,23 +86,8 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error return nil } - logCtx.Tracef("Sending an item to the event stream") - - pev, err := format.ToProto(ev) - if err != nil { - logCtx.Warnf("Could not wire event: %v", err) - return nil - } - - err = stream.Send(&eventstreamapi.Event{Event: pev}) - if err != nil { - if grpcutil.NeedReconnectOnError(err) { - return err - } else { - logCtx.Errorf("Error while sending: %v", err) - return nil - } - } + logCtx.Trace("Adding an item to the event writer", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev)) + a.eventWriter.Add(ev) return nil } @@ -130,9 +115,29 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro return nil } logCtx.Debugf("Received a new event from stream") + + if ev.Target() == event.TargetEventAck { + logCtx.Trace("Received an ACK for an event", "resourceID", ev.ResourceID(), "eventID", ev.EventID()) + rawEvent, err := format.FromProto(rcvd.Event) + if err != nil { + return err + } + a.eventWriter.Remove(rawEvent) + logCtx.Trace("Removed an event from the EventWriter", "resourceID", ev.ResourceID(), "eventID", ev.EventID()) + return nil + } + err = a.processIncomingEvent(ev) - if err != nil { + if err != nil && !event.IsEventDiscarded(err) && !event.IsEventNotAllowed(err) { logCtx.WithError(err).Errorf("Unable to process incoming event") + } else { + // Send an ACK if the event is processed successfully. + sendQ := a.queues.SendQ(a.remote.ClientID()) + if sendQ == nil { + return fmt.Errorf("no send queue found for the remote principal") + } + sendQ.Add(a.emitter.ProcessedEvent(event.EventProcessed, ev)) + logCtx.Info("Sent an ACK for an event", "resourceID", ev.ResourceID(), "eventID", ev.EventID()) } return nil } @@ -144,6 +149,10 @@ func (a *Agent) handleStreamEvents() error { if err != nil { return err } + + a.eventWriter = event.NewEventWriter(stream) + go a.eventWriter.SendWaitingEvents(a.context) + logCtx := log().WithFields(logrus.Fields{ "module": "StreamEvent", "server_addr": grpcutil.AddressFromContext(stream.Context()), diff --git a/agent/inbound.go b/agent/inbound.go index ffb5ee2..a59cca9 100644 --- a/agent/inbound.go +++ b/agent/inbound.go @@ -22,6 +22,7 @@ import ( "github.com/argoproj-labs/argocd-agent/pkg/types" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" ) /* @@ -122,7 +123,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App // that are incoming. if a.mode != types.AgentModeManaged { logCtx.Trace("Discarding this event, because agent is not in managed mode") - return nil, fmt.Errorf("cannot create application: agent is not in managed mode") + return nil, event.NewEventDiscardedErr("cannot create application: agent is not in managed mode") } // If we receive a new app event for an app we already manage, it usually @@ -131,7 +132,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App // TODO(jannfis): Handle this situation properly instead of throwing an error. if a.appManager.IsManaged(incoming.QualifiedName()) { logCtx.Trace("Discarding this event, because application is already managed on this agent") - return nil, fmt.Errorf("application %s is already managed", incoming.QualifiedName()) + return nil, event.NewEventDiscardedErr("application %s is already managed", incoming.QualifiedName()) } logCtx.Infof("Creating a new application on behalf of an incoming event") @@ -143,6 +144,11 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App } created, err := a.appManager.Create(a.context, incoming) + if apierrors.IsAlreadyExists(err) { + logCtx.Debug("application already exists") + return created, nil + } + return created, err } @@ -156,7 +162,7 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App }) if a.appManager.IsChangeIgnored(incoming.QualifiedName(), incoming.ResourceVersion) { logCtx.Tracef("Discarding this event, because agent has seen this version %s already", incoming.ResourceVersion) - return nil, fmt.Errorf("the version %s has already been seen by this agent", incoming.ResourceVersion) + return nil, event.NewEventDiscardedErr("the version %s has already been seen by this agent", incoming.ResourceVersion) } else { logCtx.Tracef("New resource version: %s", incoming.ResourceVersion) } @@ -197,6 +203,10 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error { deletionPropagation := backend.DeletePropagationBackground err := a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation) if err != nil { + if apierrors.IsNotFound(err) { + logCtx.Debug("application is not found, perhaps it is already deleted") + return nil + } return err } err = a.appManager.Unmanage(app.QualifiedName()) @@ -219,14 +229,14 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr // that are incoming. if a.mode != types.AgentModeManaged { logCtx.Trace("Discarding this event, because agent is not in managed mode") - return nil, fmt.Errorf("cannot create appproject: agent is not in managed mode") + return nil, event.NewEventDiscardedErr("cannot create appproject: agent is not in managed mode") } // If we receive a new AppProject event for an AppProject we already manage, it usually // means that we're out-of-sync from the control plane. if a.appManager.IsManaged(incoming.Name) { logCtx.Trace("Discarding this event, because AppProject is already managed on this agent") - return nil, fmt.Errorf("appproject %s is already managed", incoming.Name) + return nil, event.NewEventDiscardedErr("appproject %s is already managed", incoming.Name) } logCtx.Infof("Creating a new AppProject on behalf of an incoming event") @@ -238,6 +248,9 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr } created, err := a.projectManager.Create(a.context, incoming) + if apierrors.IsAlreadyExists(err) { + logCtx.Debug("appProject already exists") + } return created, err } @@ -251,7 +264,7 @@ func (a *Agent) updateAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr }) if a.appManager.IsChangeIgnored(incoming.Name, incoming.ResourceVersion) { logCtx.Tracef("Discarding this event, because agent has seen this version %s already", incoming.ResourceVersion) - return nil, fmt.Errorf("the version %s has already been seen by this agent", incoming.ResourceVersion) + return nil, event.NewEventDiscardedErr("the version %s has already been seen by this agent", incoming.ResourceVersion) } else { logCtx.Tracef("New resource version: %s", incoming.ResourceVersion) } @@ -283,6 +296,10 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error { deletionPropagation := backend.DeletePropagationBackground err := a.projectManager.Delete(a.context, a.namespace, project, &deletionPropagation) if err != nil { + if apierrors.IsNotFound(err) { + logCtx.Debug("appProject not found, perhaps it is already deleted") + return nil + } return err } err = a.projectManager.Unmanage(project.Name) diff --git a/internal/event/event.go b/internal/event/event.go index 6b724a0..c10bef7 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -15,10 +15,18 @@ package event import ( + "context" "errors" "fmt" + "sync" + "time" + "github.com/argoproj-labs/argocd-agent/internal/grpcutil" + "github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/sirupsen/logrus" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" _ "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" @@ -43,12 +51,19 @@ const ( SpecUpdate EventType = TypePrefix + ".spec-update" StatusUpdate EventType = TypePrefix + ".status-update" OperationUpdate EventType = TypePrefix + ".operation-update" + EventProcessed EventType = TypePrefix + ".processed" ) const ( TargetUnknown EventTarget = "unknown" TargetApplication EventTarget = "application" TargetAppProject EventTarget = "appproject" + TargetEventAck EventTarget = "eventProcessed" +) + +const ( + resourceID string = "resourceid" + eventID string = "eventid" ) var ( @@ -75,6 +90,13 @@ type Event struct { target EventTarget } +func New(ev *cloudevents.Event, target EventTarget) *Event { + return &Event{ + event: ev, + target: target, + } +} + func NewEventSource(source string) *EventSource { ev := &EventSource{} ev.source = source @@ -85,28 +107,66 @@ func IsEventDiscarded(err error) bool { return errors.Is(err, ErrEventDiscarded) } +func IsEventNotAllowed(err error) bool { + return errors.Is(err, ErrEventNotAllowed) +} + +func NewEventNotAllowedErr(format string, a ...any) error { + return fmt.Errorf("%w: %w", ErrEventNotAllowed, fmt.Errorf(format, a...)) +} + +func NewEventDiscardedErr(format string, a ...any) error { + return fmt.Errorf("%w: %w", ErrEventDiscarded, fmt.Errorf(format, a...)) +} + func (evs EventSource) ApplicationEvent(evType EventType, app *v1alpha1.Application) *cloudevents.Event { cev := cloudevents.NewEvent() cev.SetSource(evs.source) cev.SetSpecVersion(cloudEventSpecVersion) cev.SetType(evType.String()) + cev.SetExtension(eventID, createEventID(app.ObjectMeta)) + cev.SetExtension(resourceID, createResourceID(app.ObjectMeta)) cev.SetDataSchema(TargetApplication.String()) // TODO: Handle this error situation? _ = cev.SetData(cloudevents.ApplicationJSON, app) return &cev } +func createResourceID(res v1.ObjectMeta) string { + return fmt.Sprintf("%s_%s", res.Name, res.UID) +} + +func createEventID(res v1.ObjectMeta) string { + return fmt.Sprintf("%s_%s_%s", res.Name, res.UID, res.ResourceVersion) +} + func (evs EventSource) AppProjectEvent(evType EventType, appProject *v1alpha1.AppProject) *cloudevents.Event { cev := cloudevents.NewEvent() cev.SetSource(evs.source) cev.SetSpecVersion(cloudEventSpecVersion) cev.SetType(evType.String()) + cev.SetExtension(eventID, createEventID(appProject.ObjectMeta)) + cev.SetExtension(resourceID, createResourceID(appProject.ObjectMeta)) cev.SetDataSchema(TargetAppProject.String()) // TODO: Handle this error situation? _ = cev.SetData(cloudevents.ApplicationJSON, appProject) return &cev } +func (evs EventSource) ProcessedEvent(evType EventType, ev *Event) *cloudevents.Event { + cev := cloudevents.NewEvent() + cev.SetSource(evs.source) + cev.SetSpecVersion(cloudEventSpecVersion) + cev.SetType(evType.String()) + + for k, v := range ev.event.Extensions() { + cev.SetExtension(k, v) + } + + cev.SetDataSchema(TargetEventAck.String()) + return &cev +} + // FromWire validates an event from the wire in protobuf format, converts it // into an Event object and returns it. If the event on the wire is invalid, // or could not be converted for another reason, FromWire returns an error. @@ -130,6 +190,8 @@ func Target(raw *cloudevents.Event) EventTarget { return TargetApplication case TargetAppProject.String(): return TargetAppProject + case TargetEventAck.String(): + return TargetEventAck } return "" } @@ -148,8 +210,199 @@ func (ev Event) Application() (*v1alpha1.Application, error) { return app, err } +func (ev Event) ResourceID() string { + return ResourceID(ev.event) +} + +func (ev Event) EventID() string { + return EventID(ev.event) +} + +func ResourceID(ev *cloudevents.Event) string { + id, ok := ev.Extensions()[resourceID].(string) + if ok { + return id + } + + return "" +} + +func EventID(ev *cloudevents.Event) string { + id, ok := ev.Extensions()[eventID].(string) + if ok { + return id + } + + return "" +} + func (ev Event) AppProject() (*v1alpha1.AppProject, error) { proj := &v1alpha1.AppProject{} err := ev.event.DataAs(proj) return proj, err } + +type streamWriter interface { + Send(*eventstreamapi.Event) error + Context() context.Context +} + +// EventWriter keeps track of the latest event for resources and sends them on a given gRPC stream. +// It resends the event with exponential backoff until the event is ACK'd and removed from its list. +type EventWriter struct { + mu sync.RWMutex + + // key: resource name + UID + // value: latest event for a resource + // - acquire 'lock' before accessing + latestEvents map[string]*eventMessage + + // target refers to the specified gRPC stream. + target streamWriter + + log *logrus.Entry +} + +type eventMessage struct { + mu sync.RWMutex + + // latest event for a resource + // - acquire 'lock' before accessing + event *cloudevents.Event + + // retry sending the event after this time + retryAfter *time.Time + + // config for exponential backoff + backoff *wait.Backoff +} + +func NewEventWriter(target streamWriter) *EventWriter { + return &EventWriter{ + latestEvents: map[string]*eventMessage{}, + target: target, + log: logrus.WithFields(logrus.Fields{ + "module": "EventWriter", + "client_addr": grpcutil.AddressFromContext(target.Context()), + }), + } +} + +func (ew *EventWriter) Add(ev *cloudevents.Event) { + resID := ResourceID(ev) + + ew.mu.Lock() + defer ew.mu.Unlock() + + defaultBackoff := wait.Backoff{ + Steps: 5, + Duration: 5 * time.Second, + Factor: 2.0, + Jitter: 0.1, + } + + eventMsg, exists := ew.latestEvents[resID] + if !exists { + ew.latestEvents[resID] = &eventMessage{ + event: ev, + backoff: &defaultBackoff, + } + return + } + + // Replace the old event and reset the backoff. + eventMsg.mu.Lock() + eventMsg.event = ev + eventMsg.backoff = &defaultBackoff + eventMsg.retryAfter = nil + eventMsg.mu.Unlock() +} + +func (ew *EventWriter) Get(resID string) *eventMessage { + ew.mu.RLock() + defer ew.mu.RUnlock() + return ew.latestEvents[resID] +} + +func (ew *EventWriter) Remove(ev *cloudevents.Event) { + ew.mu.Lock() + defer ew.mu.Unlock() + + // Remove the event only if it matches both the resourceID and eventID. + resourceID := ResourceID(ev) + latestEvent, exists := ew.latestEvents[resourceID] + if !exists { + return + } + + if EventID(latestEvent.event) == EventID(ev) { + delete(ew.latestEvents, resourceID) + } +} + +func (ew *EventWriter) SendWaitingEvents(ctx context.Context) { + ew.log.Info("Starting event writer") + for { + select { + case <-ctx.Done(): + ew.log.Info("Shutting down event writer") + return + default: + ew.mu.RLock() + resourceIDs := make([]string, 0, len(ew.latestEvents)) + for resID := range ew.latestEvents { + resourceIDs = append(resourceIDs, resID) + } + ew.mu.RUnlock() + + for _, resourceID := range resourceIDs { + ew.sendEvent(resourceID) + } + } + time.Sleep(100 * time.Millisecond) + } +} + +func (ew *EventWriter) sendEvent(resID string) { + // Check if the event is already ACK'd. + eventMsg := ew.Get(resID) + if eventMsg == nil { + ew.log.Trace("event is not found, perhaps it is already ACK'd", "resourceID", resID) + return + } + + eventMsg.mu.Lock() + defer eventMsg.mu.Unlock() + + // Check if it is time to resend the event. + if eventMsg.retryAfter != nil && eventMsg.retryAfter.After(time.Now()) { + return + } + + defer func() { + // Update the retryAfter for resending the event again + retryAfter := time.Now().Add(eventMsg.backoff.Step()) + eventMsg.retryAfter = &retryAfter + }() + + // Resend the event since it is not ACK'd. + pev, err := format.ToProto(eventMsg.event) + if err != nil { + ew.log.Errorf("Could not wire event: %v\n", err) + return + } + + err = ew.target.Send(&eventstreamapi.Event{Event: pev}) + if err != nil { + ew.log.Errorf("Error while sending: %v\n", err) + return + } + + ew.log.Trace("event sent to target", "resourceID", resID, "type", eventMsg.event.Type()) + + // We don't have to wait for an ACK if the current event is ACK. So, remove it from the EventWriter. + if Target(eventMsg.event) == TargetEventAck { + ew.Remove(eventMsg.event) + ew.log.Trace("ACK is removed from the EventWriter", "resourceID", resID, "eventID", EventID(eventMsg.event)) + } +} diff --git a/internal/event/event_test.go b/internal/event/event_test.go index 1abe41e..036a7fc 100644 --- a/internal/event/event_test.go +++ b/internal/event/event_test.go @@ -13,3 +13,159 @@ // limitations under the License. package event + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestEventWriter(t *testing.T) { + es := NewEventSource("test") + + app1 := &v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app1", + Namespace: "test", + ResourceVersion: "1", + UID: "1234", + }, + } + + app2 := &v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app2", + Namespace: "test", + ResourceVersion: "1", + UID: "1234", + }, + } + + t.Run("should add/update/remove events from the queue", func(t *testing.T) { + fs := &fakeStream{} + evSender := NewEventWriter(fs) + + ev := es.ApplicationEvent(Create, app1) + evSender.Add(ev) + + latestEvent := evSender.Get(ResourceID(ev)) + require.NotNil(t, latestEvent) + require.Equal(t, latestEvent.event, ev) + require.Nil(t, latestEvent.retryAfter) + now := time.Now() + latestEvent.retryAfter = &now + + // Add an Update event for the same resource + app1.ResourceVersion = "2" + ev = es.ApplicationEvent(Update, app1) + evSender.Add(ev) + + latestEvent = evSender.Get(ResourceID(ev)) + require.NotNil(t, latestEvent) + require.Equal(t, latestEvent.event, ev) + require.Nil(t, latestEvent.retryAfter) + + // Try removing an event with the same resourceID but different eventID. + app1.ResourceVersion = "3" + newEv := es.ApplicationEvent(Update, app1) + evSender.Remove(newEv) + + // The old event should not removed from the queue. + latestEvent = evSender.Get(ResourceID(newEv)) + require.NotNil(t, latestEvent) + require.Equal(t, latestEvent.event, ev) + + // The event will be removed only if both the resourceID and eventID matches. + evSender.Remove(ev) + latestEvent = evSender.Get(ResourceID(ev)) + require.Nil(t, latestEvent) + }) + + t.Run("should handle events from multiple resources", func(t *testing.T) { + fs := &fakeStream{} + evSender := NewEventWriter(fs) + + app1Events := []EventType{Create, Update, Delete} + app2Events := []EventType{Create, Update, Update, Delete} + + for v, e := range app2Events { + app2.ResourceVersion = fmt.Sprintf("%d", v) + evSender.Add(es.ApplicationEvent(e, app2)) + } + + for v, e := range app1Events { + app1.ResourceVersion = fmt.Sprintf("%d", v) + evSender.Add(es.ApplicationEvent(e, app1)) + } + + require.Len(t, evSender.latestEvents, 2) + latestApp1Event := evSender.Get(createResourceID(app1.ObjectMeta)) + require.NotNil(t, latestApp1Event) + require.Equal(t, createEventID(app1.ObjectMeta), EventID(latestApp1Event.event)) + + latestApp2Event := evSender.Get(createResourceID(app2.ObjectMeta)) + require.NotNil(t, latestApp2Event) + require.Equal(t, createEventID(app2.ObjectMeta), EventID(latestApp2Event.event)) + }) + + t.Run("should send waiting events to the stream", func(t *testing.T) { + fs := &fakeStream{} + evSender := NewEventWriter(fs) + + ev := es.ApplicationEvent(Create, app1) + resID := createResourceID(app1.ObjectMeta) + evSender.Add(ev) + + // shouldn't send an event that is not being tracked + evSender.sendEvent("random-id") + require.Len(t, fs.events, 0) + + // shouldn't send an event that isn't past the retryAfter time. + latestEvent := evSender.Get(resID) + require.NotNil(t, latestEvent) + retryAfter := time.Now().Add(1 * time.Hour) + latestEvent.retryAfter = &retryAfter + + evSender.sendEvent(resID) + require.Len(t, fs.events, 0) + + // should send a valid event to the stream + retryAfter = time.Now().Add(-10 * time.Second) + latestEvent.retryAfter = &retryAfter + evSender.sendEvent(resID) + require.Len(t, fs.events, 1) + require.Equal(t, []string{createEventID(app1.ObjectMeta)}, fs.events[resID]) + }) +} + +type fakeStream struct { + mu sync.RWMutex + events map[string][]string +} + +func (fs *fakeStream) Send(event *eventstreamapi.Event) error { + fs.mu.Lock() + defer fs.mu.Unlock() + ev, err := FromWire(event.Event) + if err != nil { + return err + } + + if fs.events == nil { + fs.events = map[string][]string{} + } + + fs.events[ResourceID(ev.event)] = append(fs.events[ResourceID(ev.event)], EventID(ev.event)) + return nil +} + +func (fs *fakeStream) Context() context.Context { + return context.Background() +} diff --git a/principal/apis/eventstream/eventstream.go b/principal/apis/eventstream/eventstream.go index 4a23064..1ffae1f 100644 --- a/principal/apis/eventstream/eventstream.go +++ b/principal/apis/eventstream/eventstream.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi" "github.com/argoproj-labs/argocd-agent/pkg/types" @@ -47,6 +48,8 @@ type Server struct { options *ServerOptions queues queue.QueuePair + + eventWriter *event.EventWriter } type ServerOptions struct { @@ -153,7 +156,7 @@ func (s *Server) onDisconnect(c *client) { func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeServer) error { logCtx := c.logCtx.WithField("direction", "recv") logCtx.Tracef("Waiting to receive from channel") - event, err := subs.Recv() + streamEvent, err := subs.Recv() if err != nil { if err == io.EOF { logCtx.Tracef("Remote end hung up") @@ -171,11 +174,11 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe } return err } - if event == nil || event.Event == nil { + if streamEvent == nil || streamEvent.Event == nil { return fmt.Errorf("invalid wire transmission") } app := &v1alpha1.Application{} - incomingEvent, err := format.FromProto(event.Event) + incomingEvent, err := format.FromProto(streamEvent.Event) if err != nil { return fmt.Errorf("could not unserialize event from wire: %w", err) } @@ -189,6 +192,13 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe return fmt.Errorf("panic: no recvq for agent %s", c.agentName) } + if event.Target(incomingEvent) == event.TargetEventAck { + logCtx.Trace("Received an ACK event", "resourceID", event.ResourceID(incomingEvent), "eventID", event.EventID(incomingEvent)) + s.eventWriter.Remove(incomingEvent) + logCtx.Trace("Removed the ACK from the EventWriter") + return nil + } + q.Add(incomingEvent) return nil } @@ -225,27 +235,11 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe return fmt.Errorf("panic: invalid data in sendqueue: want: %T, have %T", cloudevents.Event{}, item) } - prEv, err := format.ToProto(ev) - if err != nil { - return fmt.Errorf("panic: could not serialize event to wire format: %v", err) - } + logCtx.Trace("Adding an item to the event writer", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev)) + s.eventWriter.Add(ev) q.Done(item) - logCtx.Tracef("Sending an item to the event stream") - - // A Send() on the stream is actually not blocking. - err = subs.Send(&eventstreamapi.Event{Event: prEv}) - if err != nil { - status, ok := status.FromError(err) - if !ok && err != io.EOF { - return fmt.Errorf("error sending data to stream: %w", err) - } - if err == io.EOF || status.Code() == codes.Unavailable { - return fmt.Errorf("remote hung up while sending data to stream: %w", err) - } - } - return nil } @@ -262,6 +256,9 @@ func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) erro return err } + s.eventWriter = event.NewEventWriter(subs) + go s.eventWriter.SendWaitingEvents(c.ctx) + // We receive events in a dedicated go routine c.wg.Add(1) go func() { diff --git a/principal/event.go b/principal/event.go index dc77db3..5a56432 100644 --- a/principal/event.go +++ b/principal/event.go @@ -16,14 +16,12 @@ package principal import ( "context" - "errors" "fmt" "time" "github.com/argoproj-labs/argocd-agent/internal/backend" "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/namedlock" - "github.com/argoproj-labs/argocd-agent/pkg/types" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" @@ -38,12 +36,12 @@ import ( // processRecvQueue processes an entry from the receiver queue, which holds the // events received by agents. It will trigger updates of resources in the // server's backend. -func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workqueue.RateLimitingInterface) error { +func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workqueue.RateLimitingInterface) (*cloudevents.Event, error) { i, _ := q.Get() ev, ok := i.(*cloudevents.Event) if !ok { q.Done(i) - return fmt.Errorf("invalid data in queue: have:%T want:%T", i, ev) + return nil, fmt.Errorf("invalid data in queue: have:%T want:%T", i, ev) } agentMode := s.agentMode(agentName) incoming := &v1alpha1.Application{} @@ -67,7 +65,7 @@ func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workq err = fmt.Errorf("unable to process event with unknown target %s", target) } q.Done(i) - return err + return ev, err } func (s *Server) processApplicationEvent(ctx context.Context, agentName string, ev *cloudevents.Event) error { @@ -100,50 +98,53 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string, // App creation event will only be processed in autonomous mode case event.Create.String(): - if agentMode.IsAutonomous() { - incoming.SetNamespace(agentName) - _, err := s.appManager.Create(ctx, incoming) - if err != nil { - return fmt.Errorf("could not create application %s: %w", incoming.QualifiedName(), err) - } - } else { - logCtx.Debugf("Discarding event, because agent is not in autonomous mode") + if !agentMode.IsAutonomous() { + logCtx.Debug("Discarding event, because agent is not in autonomous mode") return event.ErrEventDiscarded } + + incoming.SetNamespace(agentName) + _, err := s.appManager.Create(ctx, incoming) + if err != nil && !kerrors.IsAlreadyExists(err) { + return fmt.Errorf("could not create application %s: %w", incoming.QualifiedName(), err) + } // Spec updates are only allowed in autonomous mode case event.SpecUpdate.String(): - var err error - if agentMode == types.AgentModeAutonomous { - _, err = s.appManager.UpdateAutonomousApp(ctx, agentName, incoming) - } else { - err = fmt.Errorf("event type not allowed when mode is not autonomous") + if !agentMode.IsAutonomous() { + logCtx.Debug("Discarding event, because agent is not in autonomous mode") + return event.NewEventNotAllowedErr("event type not allowed when mode is not autonomous") } + + _, err := s.appManager.UpdateAutonomousApp(ctx, agentName, incoming) if err != nil { return fmt.Errorf("could not update application status for %s: %w", incoming.QualifiedName(), err) } logCtx.Infof("Updated application status %s", incoming.QualifiedName()) // Status updates are only allowed in managed mode case event.StatusUpdate.String(): - var err error - if agentMode == types.AgentModeManaged { - _, err = s.appManager.UpdateStatus(ctx, agentName, incoming) - } else { - err = fmt.Errorf("event type not allowed when mode is not managed") + if !agentMode.IsManaged() { + logCtx.Debug("Discarding event, because agent is not in managed mode") + return event.NewEventNotAllowedErr("event type not allowed when mode is not managed") } + + _, err := s.appManager.UpdateStatus(ctx, agentName, incoming) if err != nil { return fmt.Errorf("could not update application status for %s: %w", incoming.QualifiedName(), err) } logCtx.Infof("Updated application spec %s", incoming.QualifiedName()) // App deletion case event.Delete.String(): - var err error - if agentMode.IsManaged() { - err = errors.New("event type not allowed when mode is not autonomous") - } else { - deletionPropagation := backend.DeletePropagationForeground - err = s.appManager.Delete(ctx, agentName, incoming, &deletionPropagation) + if !agentMode.IsAutonomous() { + logCtx.Debug("Discarding event, because agent is not in autonomous mode") + return event.NewEventNotAllowedErr("event type not allowed when mode is not autonomous") } + + deletionPropagation := backend.DeletePropagationForeground + err := s.appManager.Delete(ctx, agentName, incoming, &deletionPropagation) if err != nil { + if kerrors.IsNotFound(err) { + return nil + } return fmt.Errorf("could not delete application %s: %w", incoming.QualifiedName(), err) } logCtx.Infof("Deleted application %s", incoming.QualifiedName()) @@ -186,13 +187,12 @@ func (s *Server) processAppProjectEvent(ctx context.Context, agentName string, e } // AppProject deletion case event.Delete.String(): - var err error - if agentMode.IsManaged() { - err = errors.New("event type not allowed when mode is not autonomous") - } else { - deletionPropagation := backend.DeletePropagationForeground - err = s.projectManager.Delete(ctx, agentName, incoming, &deletionPropagation) + if !agentMode.IsAutonomous() { + return event.NewEventNotAllowedErr("event type not allowed when mode is not autonomous") } + + deletionPropagation := backend.DeletePropagationForeground + err := s.projectManager.Delete(ctx, agentName, incoming, &deletionPropagation) if err != nil { return fmt.Errorf("could not delete app-project %s: %w", incoming.Name, err) } @@ -264,10 +264,30 @@ func (s *Server) eventProcessor(ctx context.Context) error { sem.Release(1) queueLock.Unlock(agentName) }() - err := s.processRecvQueue(ctx, agentName, q) - if err != nil { + + i, _ := q.Get() + ev, ok := i.(*cloudevents.Event) + if !ok { + q.Done(i) + logCtx.Errorf("invalid data in queue: have:%T want:%T", i, ev) + return + } + + ev, err := s.processRecvQueue(ctx, agentName, q) + if err != nil && !event.IsEventDiscarded(err) && !event.IsEventNotAllowed(err) { logCtx.WithField("client", agentName).WithError(err).Errorf("Could not process agent recveiver queue") + return + } + q.Done(ev) + + // Send an ACK if the event is processed successfully. + sendQ := s.queues.SendQ(queueName) + if sendQ == nil { + logCtx.Debugf("Queue disappeared -- client probably has disconnected") + return } + logCtx.Trace("sending an ACK", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev)) + sendQ.Add(s.events.ProcessedEvent(event.EventProcessed, event.New(ev, event.TargetEventAck))) }(queueName, q) } } diff --git a/principal/event_test.go b/principal/event_test.go index 017ef54..c13f420 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -26,7 +26,6 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -38,9 +37,9 @@ func Test_InvalidEvents(t *testing.T) { wq.On("Done", &item) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "invalid data in queue") - + assert.Nil(t, got) }) t.Run("Unknown event schema", func(t *testing.T) { @@ -51,8 +50,9 @@ func Test_InvalidEvents(t *testing.T) { wq.On("Done", &ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "unable to process event with unknown target") + assert.Equal(t, ev, *got) }) t.Run("Unknown event type", func(t *testing.T) { @@ -64,8 +64,9 @@ func Test_InvalidEvents(t *testing.T) { wq.On("Done", &ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "unable to process event of type application") + assert.Equal(t, ev, *got) }) t.Run("Invalid data in event", func(t *testing.T) { @@ -78,8 +79,9 @@ func Test_InvalidEvents(t *testing.T) { wq.On("Done", &ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "failed to unmarshal") + assert.Equal(t, ev, *got) }) } @@ -93,8 +95,9 @@ func Test_CreateEvents(t *testing.T) { wq.On("Done", &ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorIs(t, err, event.ErrEventDiscarded) + assert.Equal(t, ev, *got) }) t.Run("Create application in autonomous mode", func(t *testing.T) { @@ -130,7 +133,8 @@ func Test_CreateEvents(t *testing.T) { s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil)) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeAutonomous) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) + assert.Equal(t, ev, *got) assert.NoError(t, err) napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{}) assert.NoError(t, err) @@ -177,8 +181,9 @@ func Test_CreateEvents(t *testing.T) { s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeAutonomous) - err = s.processRecvQueue(context.Background(), "foo", wq) - assert.True(t, errors.IsAlreadyExists(err)) + got, err := s.processRecvQueue(context.Background(), "foo", wq) + assert.Nil(t, err) + require.Equal(t, ev, *got) }) } @@ -238,7 +243,8 @@ func Test_UpdateEvents(t *testing.T) { s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeAutonomous) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) + require.Equal(t, ev, *got) assert.NoError(t, err) napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{}) assert.NoError(t, err) @@ -281,7 +287,8 @@ func Test_UpdateEvents(t *testing.T) { s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) + require.Equal(t, ev, *got) assert.ErrorContains(t, err, "event type not allowed") }) @@ -336,7 +343,8 @@ func Test_processAppProjectEvent(t *testing.T) { wq.On("Done", &ev) s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) + require.Equal(t, ev, *got) assert.ErrorIs(t, err, event.ErrEventDiscarded) }) @@ -361,7 +369,8 @@ func Test_processAppProjectEvent(t *testing.T) { s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) - err = s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(context.Background(), "foo", wq) + require.Equal(t, ev, *got) assert.ErrorContains(t, err, "event type not allowed") }) }