Skip to content

Commit

Permalink
insert gears traceId into the gears payload if it is not there
Browse files Browse the repository at this point in the history
  • Loading branch information
plaxomike committed Aug 20, 2024
1 parent c9e0c00 commit 3a8b6b5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
24 changes: 22 additions & 2 deletions pkg/plugins/gears/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,24 @@ func (s *Sender) Send(e event.Event) {
e.Ack()
return
}
buf, err := json.Marshal(e.Payload())
//if tx.traceId not set and if e.UserTraceId() is set, set tx.traceId to e.UserTraceId()
payload, ok := e.Payload().(map[string]interface{})
if ok {
tx, ok := payload["tx"].(map[string]interface{})
if ok {
_, ok := tx["traceId"]
if !ok && e.UserTraceId() != "" {
tx["traceId"] = e.UserTraceId()
payload["tx"] = tx
}
} else {
if e.UserTraceId() != "" {
payload["tx"] = map[string]interface{}{"traceId": e.UserTraceId()}
}
}
}

buf, err := json.Marshal(payload)
if err != nil {
log.Ctx(e.Context()).Error().Str("op", "gears.Send").Str("name", s.Name()).Str("tid", s.Tenant().ToString()).Msg("failed to marshal message: " + err.Error())
s.getMetrics(s.getLabelValues(e, s.config.DynamicMetricLabels)).eventFailureCounter.Add(e.Context(), 1.0)
Expand All @@ -484,7 +501,10 @@ func (s *Sender) Send(e event.Event) {
s.LogSuccess()
s.Lock()
s.count++
log.Ctx(e.Context()).Info().Str("op", "gears.Send").Str("name", s.Name()).Uint32("hash", h.Sum32()).Str("producer", s.producers[pIdx].key).Str("location", location).Str("tid", s.Tenant().ToString()).Int("count", s.count).Msg("sent enveloped message on gears topic")
log.Ctx(e.Context()).Info().Str("op", "gears.Send").
Str("name", s.Name()).Uint32("hash", h.Sum32()).Str("producer", s.producers[pIdx].key).
Str("location", location).Str("tid", s.Tenant().ToString()).Int("count", s.count).
Msg("sent enveloped message on gears topic")
s.Unlock()
} else {
to := make(map[string]string, 0)
Expand Down
16 changes: 16 additions & 0 deletions pkg/plugins/gears/sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package gears

import (
"fmt"
"hash/fnv"
"testing"
)

func TestLocationHash(t *testing.T) {
location := "MChian100"
hashbuf := []byte(location)
h := fnv.New32a()
h.Write(hashbuf)
pIdx := getProducerIdx(h.Sum32(), 2)
fmt.Printf("index=%d\n", pIdx)
}

0 comments on commit 3a8b6b5

Please sign in to comment.