diff --git a/pkg/plugins/gears/sender.go b/pkg/plugins/gears/sender.go index 8992b5d1..8163e9a5 100644 --- a/pkg/plugins/gears/sender.go +++ b/pkg/plugins/gears/sender.go @@ -457,7 +457,24 @@ func (s *Sender) Send(e event.Event) { e.Ack() return } - buf, err := json.Marshal(e.Payload()) + //if tx.traceId not set and if e.UserTraceId() is set, set tx.traceId to e.UserTraceId() + payload, ok := e.Payload().(map[string]interface{}) + if ok { + tx, ok := payload["tx"].(map[string]interface{}) + if ok { + _, ok := tx["traceId"] + if !ok && e.UserTraceId() != "" { + tx["traceId"] = e.UserTraceId() + payload["tx"] = tx + } + } else { + if e.UserTraceId() != "" { + payload["tx"] = map[string]interface{}{"traceId": e.UserTraceId()} + } + } + } + + buf, err := json.Marshal(payload) if err != nil { log.Ctx(e.Context()).Error().Str("op", "gears.Send").Str("name", s.Name()).Str("tid", s.Tenant().ToString()).Msg("failed to marshal message: " + err.Error()) s.getMetrics(s.getLabelValues(e, s.config.DynamicMetricLabels)).eventFailureCounter.Add(e.Context(), 1.0) @@ -484,7 +501,10 @@ func (s *Sender) Send(e event.Event) { s.LogSuccess() s.Lock() s.count++ - log.Ctx(e.Context()).Info().Str("op", "gears.Send").Str("name", s.Name()).Uint32("hash", h.Sum32()).Str("producer", s.producers[pIdx].key).Str("location", location).Str("tid", s.Tenant().ToString()).Int("count", s.count).Msg("sent enveloped message on gears topic") + log.Ctx(e.Context()).Info().Str("op", "gears.Send"). + Str("name", s.Name()).Uint32("hash", h.Sum32()).Str("producer", s.producers[pIdx].key). + Str("location", location).Str("tid", s.Tenant().ToString()).Int("count", s.count). + Msg("sent enveloped message on gears topic") s.Unlock() } else { to := make(map[string]string, 0) diff --git a/pkg/plugins/gears/sender_test.go b/pkg/plugins/gears/sender_test.go new file mode 100644 index 00000000..4e0f64ca --- /dev/null +++ b/pkg/plugins/gears/sender_test.go @@ -0,0 +1,16 @@ +package gears + +import ( + "fmt" + "hash/fnv" + "testing" +) + +func TestLocationHash(t *testing.T) { + location := "MChian100" + hashbuf := []byte(location) + h := fnv.New32a() + h.Write(hashbuf) + pIdx := getProducerIdx(h.Sum32(), 2) + fmt.Printf("index=%d\n", pIdx) +}