Skip to content

Commit

Permalink
update trace level log format
Browse files Browse the repository at this point in the history
Signed-off-by: Chetan Banavikalmutt <[email protected]>
  • Loading branch information
chetan-rns committed Nov 18, 2024
1 parent 0a4b442 commit 873c8f9
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
8 changes: 4 additions & 4 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
return nil
}

logCtx.Trace("Adding an item to the event writer", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev))
logCtx.Tracef("Adding an item to the event writer: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev))
a.eventWriter.Add(ev)

return nil
Expand Down Expand Up @@ -117,13 +117,13 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro
logCtx.Debugf("Received a new event from stream")

if ev.Target() == event.TargetEventAck {
logCtx.Trace("Received an ACK for an event", "resourceID", ev.ResourceID(), "eventID", ev.EventID())
logCtx.Tracef("Received an ACK for an event: resourceID %s eventID %s", ev.ResourceID(), ev.EventID())
rawEvent, err := format.FromProto(rcvd.Event)
if err != nil {
return err
}
a.eventWriter.Remove(rawEvent)
logCtx.Trace("Removed an event from the EventWriter", "resourceID", ev.ResourceID(), "eventID", ev.EventID())
logCtx.Tracef("Removed an event from the EventWriter: resourceID %s eventID %s", ev.ResourceID(), ev.EventID())
return nil
}

Expand All @@ -137,7 +137,7 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro
return fmt.Errorf("no send queue found for the remote principal")
}
sendQ.Add(a.emitter.ProcessedEvent(event.EventProcessed, ev))
logCtx.Info("Sent an ACK for an event", "resourceID", ev.ResourceID(), "eventID", ev.EventID())
logCtx.Tracef("Sent an ACK for an event: resourceID %s eventID %s", ev.ResourceID(), ev.EventID())
}
return nil
}
Expand Down
15 changes: 10 additions & 5 deletions internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func (ew *EventWriter) Add(ev *cloudevents.Event) {
event: ev,
backoff: &defaultBackoff,
}
ew.log.Tracef("added a new event resourceID %s eventID %s", resID, EventID(ev))
return
}

Expand All @@ -316,6 +317,7 @@ func (ew *EventWriter) Add(ev *cloudevents.Event) {
eventMsg.backoff = &defaultBackoff
eventMsg.retryAfter = nil
eventMsg.mu.Unlock()
ew.log.Tracef("updated an existing event: resourceID %s eventID %s", resID, EventID(ev))
}

func (ew *EventWriter) Get(resID string) *eventMessage {
Expand Down Expand Up @@ -367,16 +369,19 @@ func (ew *EventWriter) sendEvent(resID string) {
// Check if the event is already ACK'd.
eventMsg := ew.Get(resID)
if eventMsg == nil {
ew.log.Trace("event is not found, perhaps it is already ACK'd", "resourceID", resID)
ew.log.Tracef("event is not found, perhaps it is already ACK'd: resourceID %s", resID)
return
}

eventMsg.mu.Lock()
defer eventMsg.mu.Unlock()

// Check if it is time to resend the event.
if eventMsg.retryAfter != nil && eventMsg.retryAfter.After(time.Now()) {
return
if eventMsg.retryAfter != nil {
if eventMsg.retryAfter.After(time.Now()) {
return
}
ew.log.Tracef("resending an event: resourceID %s eventID %s", resID, EventID(eventMsg.event))
}

defer func() {
Expand All @@ -398,11 +403,11 @@ func (ew *EventWriter) sendEvent(resID string) {
return
}

ew.log.Trace("event sent to target", "resourceID", resID, "type", eventMsg.event.Type())
ew.log.Tracef("event sent to target: resourceID %s eventID %s", resID, EventID(eventMsg.event))

// We don't have to wait for an ACK if the current event is ACK. So, remove it from the EventWriter.
if Target(eventMsg.event) == TargetEventAck {
ew.Remove(eventMsg.event)
ew.log.Trace("ACK is removed from the EventWriter", "resourceID", resID, "eventID", EventID(eventMsg.event))
ew.log.Tracef("ACK is removed from the EventWriter: resourceID: %s eventID: %s", resID, EventID(eventMsg.event))
}
}
4 changes: 2 additions & 2 deletions principal/apis/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe
}

if event.Target(incomingEvent) == event.TargetEventAck {
logCtx.Trace("Received an ACK event", "resourceID", event.ResourceID(incomingEvent), "eventID", event.EventID(incomingEvent))
logCtx.Tracef("Received an ACK: resourceID %s eventID %s", event.ResourceID(incomingEvent), event.EventID(incomingEvent))
s.eventWriter.Remove(incomingEvent)
logCtx.Trace("Removed the ACK from the EventWriter")
return nil
Expand Down Expand Up @@ -235,7 +235,7 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe
return fmt.Errorf("panic: invalid data in sendqueue: want: %T, have %T", cloudevents.Event{}, item)
}

logCtx.Trace("Adding an item to the event writer", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev))
logCtx.Tracef("Adding an item to the event writer: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev))
s.eventWriter.Add(ev)

q.Done(item)
Expand Down
2 changes: 1 addition & 1 deletion principal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (s *Server) eventProcessor(ctx context.Context) error {
logCtx.Debugf("Queue disappeared -- client probably has disconnected")
return
}
logCtx.Trace("sending an ACK", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev))
logCtx.Tracef("sending an ACK: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev))
sendQ.Add(s.events.ProcessedEvent(event.EventProcessed, event.New(ev, event.TargetEventAck)))
}(queueName, q)
}
Expand Down

0 comments on commit 873c8f9

Please sign in to comment.