Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: resend events with exponential backoff and wait for ACK #225

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

chetan-rns
Copy link
Collaborator

  • Added a new event type ACK that is sent by the principal/agent when an event is processed successfully.
  • EventWriter keeps track of the latest events and sends them on the gRPC stream. Both the agent and the principal will add events to the EventWriter instead of directly sending them to the gRPC stream. The EventWriter will send the events with exponential backoff until we receive an ACK from the other side. It continuously loops over the waiting events and sends them to the other side with minimal delay. The goal was to propagate the events as soon as possible with minimal delay and without blocking the other events.
  • Added UID (resourceID) and resourceVersion (eventID) to the cloudevent as extensions. We do this to map an event to a particular state of a resource. For example, when we receive an ACK for an event we want to know if the ACK is for the latest version of a resource.

Note: I think there are multiple ways of designing this and I'm happy to update the approach based on the feedback.

Other choices that were considered:

  1. We send events on a stream and immediately wait for an ACK. Use apimachinery's wait.ExponentialBackOff to resend events until we receive an ACK. However, the events from other resources will be blocked until the current event is processed.
  2. We can improvise the above approach by running the wait.ExponentialBackOff in a separate goroutine for each app. However, each goroutine is shortlived (they only exist until we receive an ACK) and there might be a scenario where the runtime spends more time context switching if there are a large number of apps.
  3. We could potentially solve this using a pool of goroutines (like a boss-worker pattern). But I still went with the current approach (continuously loop and resend) since there is minimal delay and we don't have to wait for the ACK sequentially.

Fixes: #117

@codecov-commenter
Copy link

codecov-commenter commented Nov 18, 2024

Codecov Report

Attention: Patch coverage is 44.24460% with 155 lines in your changes missing coverage. Please review.

Project coverage is 47.21%. Comparing base (72eaea7) to head (2525ac2).

Files with missing lines Patch % Lines
internal/event/event.go 58.49% 62 Missing and 4 partials ⚠️
principal/event.go 26.00% 36 Missing and 1 partial ⚠️
agent/connection.go 0.00% 24 Missing ⚠️
agent/inbound.go 23.80% 14 Missing and 2 partials ⚠️
principal/apis/eventstream/eventstream.go 50.00% 9 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #225      +/-   ##
==========================================
+ Coverage   46.82%   47.21%   +0.38%     
==========================================
  Files          57       57              
  Lines        4952     5151     +199     
==========================================
+ Hits         2319     2432     +113     
- Misses       2452     2529      +77     
- Partials      181      190       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

@chetan-rns chetan-rns changed the title fix: resend events with exponential backoff and wait for ACK feat: resend events with exponential backoff and wait for ACK Nov 18, 2024
@chetan-rns chetan-rns force-pushed the agent-principal-resync branch 2 times, most recently from cd3e618 to 873c8f9 Compare November 18, 2024 19:01
@chetan-rns chetan-rns marked this pull request as ready for review November 18, 2024 19:01
return nil
}
}
logCtx.Tracef("Adding an item to the event writer: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please use structured fields for resourceID and eventID here, e.g.

logCtx.WithField("resource_id", event.resourceID(ev)).WithField("event_id", event.EventID(ev)).Trace(...)

Same goes for the other calls to the logger where these data goes. I think it will greatly help correlating the logs later on, especially if you ingest the logs into a tokenizer or use JSON logging.

@jannfis
Copy link
Collaborator

jannfis commented Nov 19, 2024

Great stuff, @chetan-rns!

I'm starting a review cycle. Please bear with me, as my reviews are dripping in :)

Copy link
Collaborator

@jannfis jannfis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good to me, except for the logging.

Going forward, we may want to augment the event writer with metrics and potentially rate limiting or a size cap. But that's something for different PRs.

}
}

func (ew *EventWriter) SendWaitingEvents(ctx context.Context) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be noted that this function will never return unless the context is done, and therefore should be started in a go routine.

Copy link
Member

@jgwest jgwest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great @chetan-rns, nice work!

In addition to some suggestions re: logic/mutex, I've added some feedback around 1) that we need to use UID value from source of truth, and 2) not discarding messages if agent hasn't connected yet.

  • IMHO these are both within scope of this PR/issue, but if you feel that these are out of scope for the issue, or that you would prefer we work on them as part of a separate PR, that's fine with me.


// key: AgentName
// value: EventWriter for that agent
eventWriters map[string]*event.EventWriter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading through the code, it appears eventWriters requires a mutex for when it is read/modified, for example, in Subscribe func in this file. AFAICT the existing code flow does not prevent concurrent modification.

return
}

err = ew.target.Send(&eventstreamapi.Event{Event: pev})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add this comment which was moved from elsewhere in the code:

	// A Send() on the stream is actually not blocking.

logCtx.Tracef("Received an ACK: resourceID %s eventID %s", event.ResourceID(incomingEvent), event.EventID(incomingEvent))
eventWriter, exists := s.eventWriters[c.agentName]
if !exists {
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors returned from this function will cause the entire client <-> agent connection/event processing logic to terminate. Is that what we want in this case?

@@ -189,6 +195,17 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe
return fmt.Errorf("panic: no recvq for agent %s", c.agentName)
}

if event.Target(incomingEvent) == event.TargetEventAck {
logCtx.Tracef("Received an ACK: resourceID %s eventID %s", event.ResourceID(incomingEvent), event.EventID(incomingEvent))
eventWriter, exists := s.eventWriters[c.agentName]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, think a mutex is needed for this.

if !exists {
return
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to acquire latestEvent's mutex here, since we are reading from it

return fmt.Errorf("panic: could not serialize event to wire format: %v", err)
eventWriter, exists := s.eventWriters[c.agentName]
if !exists {
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, it appears that if principal attempts to send a message to agent, before agent has connected, the message will be discarded.

This causes a problem in this case, for example:

  1. Principal process stops.
  2. User makes changes to one or more Argo CD Application/AppProjects/etc.
  3. Principal process starts.
  4. Principal process starts calls 'StartInformer' to setup K8s watch and start processing K8s events
  5. Principal receives events for all watched resources from informer, e.g. Applications, and attempts to send the new Application state (from step 2) to agent
    • But agent hasn't connected yet, so the events are discarded.
  6. Agent connects.
    • But agent missed the initial Application updates, above, so no new messages are sent.

You can see an example of this in principal/apis/eventstream/eventstream.go in sendFunc, where:

  • If there is no send queue (outbox) for an agent (by name), the event is discarded and we stop processing events
  • If there IS a send queue, but not an eventWriter for that agent (by name), we likewise discard the event and stop processing events.

AFAICT the solution to this is either:

  • A) we keep agent events in the queue, even if the agent hasn't connected yet (don't discard them).
    • The only issue is how to know what the name of the agent is before it connects, and I presume we can get this from the namespace on principal side (in the managed agent case)
  • or, B) when an agent first connects (after principal OS process startup), we manually re-queue all events for all resources in the agent namespace on principal (e.g. we cause all the events to be re-reconciled by the informer, which cause messages to be sent containing the contents).

My personal preference is A).

cev.SetDataSchema(TargetApplication.String())
// TODO: Handle this error situation?
_ = cev.SetData(cloudevents.ApplicationJSON, app)
return &cev
}

func createResourceID(res v1.ObjectMeta) string {
return fmt.Sprintf("%s_%s", res.Name, res.UID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, in this PR, the UID is taken from .metadata. In order to ensure consistency between source of truth (e.g. principal) and peer (e.g. managed agent), we need to use the UID from the source of truth on both sides:

  • Resources events are sent between principal <-> agent
  • The UID of the resource (application/appproject) is now included as part of the event, as part of this PR.
  • However, the uid value MUST be the same on both the principal and agent sides.
  • Our logic would be: when the agent/principal sees an event for a resource (e.g. application) with a matching name, but without have a matching uid, our logic should be to assume that the object with the old uuid was deleted.
  • At present, (correct me if wrong!) this PR is using .metadata.UID for both principal and agent sides. This is a problem (likely that we will hit later), as this value will be different on both sides.
    • The .metadata.uid value is generated by K8s when the object is created.
    • However this means it is generated twice: one on the principal side when the Application is created in the principal namespace, and again on the agent side, when the agent completes the Application sync from principal.
    • Since they are being created on separate clusters, and the UID value is not used on creation, the values will be different.
  • The canonical UID for a resource should come from the source of truth for that resource:
    • For managed agent, the .metadata.UID on principal is the source of truth
    • For autonomous agent, the .metadata.UID on agent is the source of truth.
  • So, we can still use .metadata.UID when sending events from source of truth (principal), but the non-source of truth peer needs a way to know what UUID to use.

Proposed Solution:

  • For this, I recommend we add an annotation like application-uid: (uid from principal) to the resource on the agent (e.g. non source of truth) side.
  • Then, when the agent is processing an event, it can use this uuid judge whether or not the event from principal still applies (and vice versa)
    • If the UID matches, the event still applies. If the UID doesn't match, the event no longer applies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants