-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: resend events with exponential backoff and wait for ACK #225
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #225 +/- ##
==========================================
+ Coverage 46.82% 47.21% +0.38%
==========================================
Files 57 57
Lines 4952 5151 +199
==========================================
+ Hits 2319 2432 +113
- Misses 2452 2529 +77
- Partials 181 190 +9 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
edf89c7
to
ce57e29
Compare
cd3e618
to
873c8f9
Compare
return nil | ||
} | ||
} | ||
logCtx.Tracef("Adding an item to the event writer: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please use structured fields for resourceID
and eventID
here, e.g.
logCtx.WithField("resource_id", event.resourceID(ev)).WithField("event_id", event.EventID(ev)).Trace(...)
Same goes for the other calls to the logger where these data goes. I think it will greatly help correlating the logs later on, especially if you ingest the logs into a tokenizer or use JSON logging.
Great stuff, @chetan-rns! I'm starting a review cycle. Please bear with me, as my reviews are dripping in :) |
Signed-off-by: Chetan Banavikalmutt <[email protected]>
Signed-off-by: Chetan Banavikalmutt <[email protected]>
Signed-off-by: Chetan Banavikalmutt <[email protected]>
873c8f9
to
2525ac2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good to me, except for the logging.
Going forward, we may want to augment the event writer with metrics and potentially rate limiting or a size cap. But that's something for different PRs.
} | ||
} | ||
|
||
func (ew *EventWriter) SendWaitingEvents(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably be noted that this function will never return unless the context is done, and therefore should be started in a go routine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great @chetan-rns, nice work!
In addition to some suggestions re: logic/mutex, I've added some feedback around 1) that we need to use UID value from source of truth, and 2) not discarding messages if agent hasn't connected yet.
- IMHO these are both within scope of this PR/issue, but if you feel that these are out of scope for the issue, or that you would prefer we work on them as part of a separate PR, that's fine with me.
|
||
// key: AgentName | ||
// value: EventWriter for that agent | ||
eventWriters map[string]*event.EventWriter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From reading through the code, it appears eventWriters
requires a mutex for when it is read/modified, for example, in Subscribe
func in this file. AFAICT the existing code flow does not prevent concurrent modification.
return | ||
} | ||
|
||
err = ew.target.Send(&eventstreamapi.Event{Event: pev}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can add this comment which was moved from elsewhere in the code:
// A Send() on the stream is actually not blocking.
logCtx.Tracef("Received an ACK: resourceID %s eventID %s", event.ResourceID(incomingEvent), event.EventID(incomingEvent)) | ||
eventWriter, exists := s.eventWriters[c.agentName] | ||
if !exists { | ||
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors returned from this function will cause the entire client <-> agent connection/event processing logic to terminate. Is that what we want in this case?
@@ -189,6 +195,17 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe | |||
return fmt.Errorf("panic: no recvq for agent %s", c.agentName) | |||
} | |||
|
|||
if event.Target(incomingEvent) == event.TargetEventAck { | |||
logCtx.Tracef("Received an ACK: resourceID %s eventID %s", event.ResourceID(incomingEvent), event.EventID(incomingEvent)) | |||
eventWriter, exists := s.eventWriters[c.agentName] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, think a mutex is needed for this.
if !exists { | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to acquire latestEvent
's mutex here, since we are reading from it
return fmt.Errorf("panic: could not serialize event to wire format: %v", err) | ||
eventWriter, exists := s.eventWriters[c.agentName] | ||
if !exists { | ||
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment, it appears that if principal attempts to send a message to agent, before agent has connected, the message will be discarded.
This causes a problem in this case, for example:
- Principal process stops.
- User makes changes to one or more Argo CD Application/AppProjects/etc.
- Principal process starts.
- Principal process starts calls 'StartInformer' to setup K8s watch and start processing K8s events
- Principal receives events for all watched resources from informer, e.g. Applications, and attempts to send the new Application state (from step 2) to agent
- But agent hasn't connected yet, so the events are discarded.
- Agent connects.
- But agent missed the initial Application updates, above, so no new messages are sent.
You can see an example of this in principal/apis/eventstream/eventstream.go
in sendFunc
, where:
- If there is no send queue (outbox) for an agent (by name), the event is discarded and we stop processing events
- If there IS a send queue, but not an eventWriter for that agent (by name), we likewise discard the event and stop processing events.
AFAICT the solution to this is either:
- A) we keep agent events in the queue, even if the agent hasn't connected yet (don't discard them).
- The only issue is how to know what the name of the agent is before it connects, and I presume we can get this from the namespace on principal side (in the managed agent case)
- or, B) when an agent first connects (after principal OS process startup), we manually re-queue all events for all resources in the agent namespace on principal (e.g. we cause all the events to be re-reconciled by the informer, which cause messages to be sent containing the contents).
My personal preference is A).
cev.SetDataSchema(TargetApplication.String()) | ||
// TODO: Handle this error situation? | ||
_ = cev.SetData(cloudevents.ApplicationJSON, app) | ||
return &cev | ||
} | ||
|
||
func createResourceID(res v1.ObjectMeta) string { | ||
return fmt.Sprintf("%s_%s", res.Name, res.UID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, in this PR, the UID is taken from .metadata
. In order to ensure consistency between source of truth (e.g. principal) and peer (e.g. managed agent), we need to use the UID from the source of truth on both sides:
- Resources events are sent between principal <-> agent
- The UID of the resource (application/appproject) is now included as part of the event, as part of this PR.
- However, the uid value MUST be the same on both the principal and agent sides.
- Our logic would be: when the agent/principal sees an event for a resource (e.g. application) with a matching name, but without have a matching uid, our logic should be to assume that the object with the old uuid was deleted.
- At present, (correct me if wrong!) this PR is using
.metadata.UID
for both principal and agent sides. This is a problem (likely that we will hit later), as this value will be different on both sides.- The
.metadata.uid
value is generated by K8s when the object is created. - However this means it is generated twice: one on the principal side when the Application is created in the principal namespace, and again on the agent side, when the agent completes the Application sync from principal.
- Since they are being created on separate clusters, and the UID value is not used on creation, the values will be different.
- The
- The canonical UID for a resource should come from the source of truth for that resource:
- For managed agent, the
.metadata.UID
on principal is the source of truth - For autonomous agent, the
.metadata.UID
on agent is the source of truth.
- For managed agent, the
- So, we can still use
.metadata.UID
when sending events from source of truth (principal), but the non-source of truth peer needs a way to know what UUID to use.
Proposed Solution:
- For this, I recommend we add an annotation like
application-uid: (uid from principal)
to the resource on the agent (e.g. non source of truth) side. - Then, when the agent is processing an event, it can use this uuid judge whether or not the event from principal still applies (and vice versa)
- If the UID matches, the event still applies. If the UID doesn't match, the event no longer applies.
ACK
that is sent by the principal/agent when an event is processed successfully.EventWriter
keeps track of the latest events and sends them on the gRPC stream. Both the agent and the principal will add events to theEventWriter
instead of directly sending them to the gRPC stream. TheEventWriter
will send the events with exponential backoff until we receive an ACK from the other side. It continuously loops over the waiting events and sends them to the other side with minimal delay. The goal was to propagate the events as soon as possible with minimal delay and without blocking the other events.UID
(resourceID) andresourceVersion
(eventID) to the cloudevent as extensions. We do this to map an event to a particular state of a resource. For example, when we receive an ACK for an event we want to know if the ACK is for the latest version of a resource.Note: I think there are multiple ways of designing this and I'm happy to update the approach based on the feedback.
Other choices that were considered:
Fixes: #117