From eb5cd674b07ba3e1f0970d5d0879fd221fe74cc3 Mon Sep 17 00:00:00 2001 From: afzalabbasi Date: Fri, 28 Jun 2024 15:08:43 +0500 Subject: [PATCH 01/11] fix duration bug in case of golang profile. --- receiver/pyroscopereceiver/receiver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 56ad23c..dd5648a 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -392,7 +392,6 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, } else { timestampNs = pm.start durationNs = pm.end - pm.start - durationNs = ns(durationNs) } record.SetTimestamp(pcommon.Timestamp(timestampNs)) m := record.Attributes() From d60576d5253e776647cc8e1098052ee0224e9d5d Mon Sep 17 00:00:00 2001 From: tomershafir Date: Tue, 8 Oct 2024 15:15:31 +0300 Subject: [PATCH 02/11] pyroscope-receiver: remove service label from hists Prometheus histograms are very expensive. This change removes service_name label from histograms, because it can explode cardinality. --- receiver/pyroscopereceiver/receiver.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 56ad23c..b5ba82c 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -358,7 +358,7 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, return logs, fmt.Errorf("failed to decompress body: %w", err) } // TODO: try measure compressed size - otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, ""))) + otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(formatJfr, ""))) resetHeaders(req) md := profile_types.Metadata{SampleRateHertz: 0} @@ -422,7 +422,7 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, ) } // sz may be 0 and it will be recorded - otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, ""))) + otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(formatPprof, ""))) return logs, nil } @@ -430,8 +430,8 @@ func ns(sec uint64) uint64 { return sec * 1e9 } -func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding string) *attribute.Set { - s := attribute.NewSet(attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, attribute.KeyValue{Key: "type", Value: attribute.StringValue(typ)}, attribute.KeyValue{Key: "encoding", Value: attribute.StringValue(encoding)}) +func newOtelcolAttrSetPayloadSizeBytes(typ string, encoding string) *attribute.Set { + s := attribute.NewSet(attribute.KeyValue{Key: "type", Value: attribute.StringValue(typ)}, attribute.KeyValue{Key: "encoding", Value: attribute.StringValue(encoding)}) return &s } From 1cf50027b80699155dcd446362dd8548fd486bc4 Mon Sep 17 00:00:00 2001 From: akvlad Date: Thu, 10 Oct 2024 17:19:33 +0300 Subject: [PATCH 03/11] configurable metrics --- receiver/pyroscopereceiver/README.md | 12 +++++ receiver/pyroscopereceiver/config.go | 8 ++++ receiver/pyroscopereceiver/metrics.go | 15 ++++-- receiver/pyroscopereceiver/receiver.go | 66 ++++++++++++++++++++------ 4 files changed, 83 insertions(+), 18 deletions(-) diff --git a/receiver/pyroscopereceiver/README.md b/receiver/pyroscopereceiver/README.md index e0aa639..94fb165 100644 --- a/receiver/pyroscopereceiver/README.md +++ b/receiver/pyroscopereceiver/README.md @@ -11,6 +11,18 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op - `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6. - `timeout`: sets the server reponse timeout. Default is 10 seconds. +- `metrics`: configures the metrics collection for the Pyroscope receiver. + - `enable`: enables or disables metrics collection. Default is true. + - `exclude_labels`: a list of label names to exclude from the metrics. Available labels are: + - `service`: name of the service provided the pprof request + - `type`: type of pprof request (jfr or pprof) + - `encoding`: not used, empty + - `error_code`: http error code response for http request count + - `status_code`: http response status code for http request count + - `exclude_metrics`: a list of metric names to exclude from collection. Available metrics are: + - `http_request_total`: Pyroscope receiver http request count. + - `request_body_uncompressed_size_bytes`: Pyroscope receiver uncompressed request body size in bytes. + - `parsed_body_uncompressed_size_bytes`: Pyroscope receiver uncompressed parsed body size in bytes. ## Example diff --git a/receiver/pyroscopereceiver/config.go b/receiver/pyroscopereceiver/config.go index 988a755..7cad55f 100644 --- a/receiver/pyroscopereceiver/config.go +++ b/receiver/pyroscopereceiver/config.go @@ -14,12 +14,20 @@ type Protocols struct { HTTP *confighttp.ServerConfig `mapstructure:"http"` } +type MetricsConfig struct { + Enable bool `mapstructure:"enable" default:"true"` + ExcludeLabels []string `mapstructure:"exclude_labels"` + ExcludeMetrics []string `mapstructure:"exclude_metrics"` +} + // Represents the receiver config within the collector's config.yaml type Config struct { Protocols Protocols `mapstructure:"protocols"` // Cofigures timeout for synchronous request handling by the receiver server Timeout time.Duration `mapstructure:"timeout"` + + Metrics MetricsConfig `mapstructure:"metrics"` } var _ component.Config = (*Config)(nil) diff --git a/receiver/pyroscopereceiver/metrics.go b/receiver/pyroscopereceiver/metrics.go index c7dbc6f..2737499 100644 --- a/receiver/pyroscopereceiver/metrics.go +++ b/receiver/pyroscopereceiver/metrics.go @@ -2,6 +2,7 @@ package pyroscopereceiver import ( "fmt" + "slices" "go.opentelemetry.io/otel/metric" ) @@ -14,22 +15,28 @@ var ( otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram ) -func initMetrics(meter metric.Meter) error { +func initMetrics(meter metric.Meter, conf *Config) error { var err error - if otelcolReceiverPyroscopeHttpRequestTotal, err = meter.Int64Counter( + if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "http_request_total") { + otelcolReceiverPyroscopeHttpRequestTotal = nil + } else if otelcolReceiverPyroscopeHttpRequestTotal, err = meter.Int64Counter( fmt.Sprint(prefix, "http_request_total"), metric.WithDescription("Pyroscope receiver http request count"), ); err != nil { return err } - if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram( + if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "request_body_uncompressed_size_bytes") { + otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes = nil + } else if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram( fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"), metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"), metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576), ); err != nil { return err } - if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram( + if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "parsed_body_uncompressed_size_bytes") { + otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes = nil + } else if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram( fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"), metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"), metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576), diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 6e88992..6688939 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -12,6 +12,7 @@ import ( "net" "net/http" "net/url" + "slices" "strconv" "strings" "sync" @@ -117,7 +118,7 @@ func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.Set r.mux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) { r.httpHandlerIngest(resp, req) }) - if err := initMetrics(r.meter); err != nil { + if err := initMetrics(r.meter, cfg); err != nil { r.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) return r, err } @@ -176,15 +177,23 @@ func (r *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter r.handleError(ctx, resp, "text/plain", http.StatusInternalServerError, err.Error(), pm.name, errorCodeError) return } - - otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess, http.StatusNoContent))) + if otelcolReceiverPyroscopeHttpRequestTotal != nil { + otelcolReceiverPyroscopeHttpRequestTotal.Add( + ctx, 1, + metric.WithAttributeSet(*r.newOtelcolAttrSetHttp(pm.name, errorCodeSuccess, http.StatusNoContent)), + ) + } writeResponseNoContent(resp) }() return c } func (recv *pyroscopeReceiver) handleError(ctx context.Context, resp http.ResponseWriter, contentType string, statusCode int, msg string, service string, errorCode string) { - otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode, statusCode))) + if otelcolReceiverPyroscopeHttpRequestTotal != nil { + otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, + metric.WithAttributeSet(*recv.newOtelcolAttrSetHttp(service, errorCode, statusCode))) + } + recv.logger.Error(msg) writeResponse(resp, "text/plain", statusCode, []byte(msg)) } @@ -240,12 +249,19 @@ func readParams(qs *url.Values) (params, error) { return p, nil } -func newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *attribute.Set { - s := attribute.NewSet( - attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, - attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)}, - attribute.KeyValue{Key: "status_code", Value: attribute.IntValue(statusCode)}, - ) +func (r *pyroscopeReceiver) newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *attribute.Set { + keyValues := []attribute.KeyValue{ + {Key: keyService, Value: attribute.StringValue(service)}, + {Key: "error_code", Value: attribute.StringValue(errorCode)}, + {Key: "status_code", Value: attribute.IntValue(statusCode)}, + } + for i := len(keyValues) - 1; i >= 0; i-- { + if slices.Contains(r.cfg.Metrics.ExcludeLabels, string(keyValues[i].Key)) { + keyValues[i] = keyValues[len(keyValues)-1] + keyValues = keyValues[:len(keyValues)-1] + } + } + s := attribute.NewSet(keyValues...) return &s } @@ -334,7 +350,9 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, r.logger.Debug("received profiles", zap.String("url_query", req.URL.RawQuery)) qs := req.URL.Query() + format := formatPprof if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") { + format = formatJfr p = jfrparser.NewJfrPprofParser() } else if tmp, ok = qs["spyName"]; ok && (tmp[0] == "nodespy") { p = nodeparser.NewNodePprofParser() @@ -358,7 +376,11 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, return logs, fmt.Errorf("failed to decompress body: %w", err) } // TODO: try measure compressed size - otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(formatJfr, ""))) + if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes != nil { + otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), + metric.WithAttributeSet(*r.newOtelcolAttrSetPayloadSizeBytes(pm.name, format, ""))) + } + resetHeaders(req) md := profile_types.Metadata{SampleRateHertz: 0} @@ -421,7 +443,11 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, ) } // sz may be 0 and it will be recorded - otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(formatPprof, ""))) + if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes != nil { + otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), + metric.WithAttributeSet(*r.newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, ""))) + } + return logs, nil } @@ -429,8 +455,20 @@ func ns(sec uint64) uint64 { return sec * 1e9 } -func newOtelcolAttrSetPayloadSizeBytes(typ string, encoding string) *attribute.Set { - s := attribute.NewSet(attribute.KeyValue{Key: "type", Value: attribute.StringValue(typ)}, attribute.KeyValue{Key: "encoding", Value: attribute.StringValue(encoding)}) +func (r *pyroscopeReceiver) newOtelcolAttrSetPayloadSizeBytes(service string, typ string, + encoding string) *attribute.Set { + keyValues := []attribute.KeyValue{ + {Key: keyService, Value: attribute.StringValue(service)}, + {Key: "type", Value: attribute.StringValue(typ)}, + {Key: "encoding", Value: attribute.StringValue(encoding)}, + } + for i := len(keyValues) - 1; i >= 0; i-- { + if slices.Contains(r.cfg.Metrics.ExcludeLabels, string(keyValues[i].Key)) { + keyValues[i] = keyValues[len(keyValues)-1] + keyValues = keyValues[:len(keyValues)-1] + } + } + s := attribute.NewSet(keyValues...) return &s } From e432362ed655d4f95a82897800fd2937819fbb74 Mon Sep 17 00:00:00 2001 From: akvlad Date: Thu, 10 Oct 2024 21:07:10 +0300 Subject: [PATCH 04/11] memory improvement attempt --- .../ch/access_native_columnar.go | 127 +++++++++++------- 1 file changed, 78 insertions(+), 49 deletions(-) diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index 2b83c58..53912ea 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -93,6 +94,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) payload_type := make([]string, 0) payload := make([][]byte, 0) tree := make([][]tuple, 0) + var pooledTrees []*PooledTree functions := make([][]tuple, 0) rl := ls.ResourceLogs() @@ -179,7 +181,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) return 0, err } - tree = append(tree, _tree) + pooledTrees = append(pooledTrees, _tree) + tree = append(tree, _tree.data) idx = offset + s ch.logger.Debug( @@ -239,13 +242,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) return 0, err } err = b.Send() - for _, tpls := range tree { - for _, t := range tpls { - for _, v := range t[3].([]tuple) { - triples.put(v) - } - quadruples.put(t) - } + for _, tpls := range pooledTrees { + trees.put(tpls) } return offset, err } @@ -302,58 +300,90 @@ func readFunctionsFromMap(m pcommon.Map) ([]tuple, error) { } type LimitedPool struct { - m sync.RWMutex - pool *sync.Pool - size int + m sync.RWMutex + pool [20]*sync.Pool + createPool func() *sync.Pool } -func (l *LimitedPool) get() tuple { +type PooledTree struct { + time time.Time + triplesCount int + data []tuple + triples []tuple +} + +func (l *LimitedPool) get(quadruples int, triples int) *PooledTree { l.m.Lock() defer l.m.Unlock() - l.size-- - if l.size < 0 { - l.size = 0 + var pool *sync.Pool + if triples >= 20 { + pool = l.createPool() + } else if l.pool[triples] == nil { + l.pool[triples] = l.createPool() + pool = l.pool[triples] + } else { + pool = l.pool[triples] + } + tree := pool.Get().(*PooledTree) + var redo bool + if cap(tree.triples) < quadruples*triples { + tree.triples = make([]tuple, quadruples*triples) + for i := range tree.triples { + tree.triples[i] = tuple{nil, nil, nil} + } + redo = true + } + tree.triples = tree.triples[:quadruples*triples] + if cap(tree.data) < quadruples { + tree.data = make([]tuple, quadruples) + redo = true } - return l.pool.Get().(tuple) + tree.data = tree.data[:quadruples] + if redo || tree.triplesCount != triples { + j := 0 + for i := range tree.data { + _triples := tree.triples[j : j+triples] + j += triples + tree.data[i] = tuple{nil, nil, nil, _triples} + } + } + tree.triplesCount = triples + return tree } -func (l *LimitedPool) put(t tuple) { +func (l *LimitedPool) put(t *PooledTree) { l.m.Lock() defer l.m.Unlock() - if l.size >= 100000 { + if t.triplesCount >= 20 { return } - l.size++ - l.pool.Put(t) -} - -var triples = LimitedPool{ - pool: &sync.Pool{ - New: func() interface{} { - return make(tuple, 3) - }, - }, + pool := l.pool[t.triplesCount] + if time.Now().Sub(t.time) < time.Minute { + pool.Put(t) + } } -var quadruples = LimitedPool{ - pool: &sync.Pool{ - New: func() interface{} { - return make(tuple, 4) - }, +var trees = LimitedPool{ + createPool: func() *sync.Pool { + return &sync.Pool{ + New: func() interface{} { + return &PooledTree{time: time.Now()} + }, + } }, } -func readTreeFromMap(m pcommon.Map) ([]tuple, error) { +func readTreeFromMap(m pcommon.Map) (*PooledTree, error) { raw, _ := m.Get("tree") bRaw := bytes.NewReader(raw.Bytes().AsRaw()) - size, err := binary.ReadVarint(bRaw) + treeSize, err := binary.ReadVarint(bRaw) if err != nil { return nil, err } - res := make([]tuple, size) + var res *PooledTree - for i := int64(0); i < size; i++ { + for i := int64(0); i < treeSize; i++ { parentId, err := binary.ReadUvarint(bRaw) if err != nil { return nil, err @@ -374,8 +404,11 @@ func readTreeFromMap(m pcommon.Map) ([]tuple, error) { return nil, err } - values := make([]tuple, size) - for i := range values { + if res == nil { + res = trees.get(int(treeSize), int(size)) + } + + for j := int64(0); j < size; j++ { size, err := binary.ReadVarint(bRaw) if err != nil { return nil, err @@ -395,17 +428,13 @@ func readTreeFromMap(m pcommon.Map) ([]tuple, error) { if err != nil { return nil, err } - - values[i] = triples.get() // tuple{name, self, total} - values[i][0] = name - values[i][1] = self - values[i][2] = total + res.data[i][3].([]tuple)[j][0] = name + res.data[i][3].([]tuple)[j][1] = self + res.data[i][3].([]tuple)[j][2] = total } - res[i] = quadruples.get() // tuple{parentId, fnId, nodeId, values} - res[i][0] = parentId - res[i][1] = fnId - res[i][2] = nodeId - res[i][3] = values + res.data[i][0] = parentId + res.data[i][1] = fnId + res.data[i][2] = nodeId } return res, nil } From c52cf3878aa9bc92b179a488b125dc6894602f5a Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 21 Oct 2024 16:26:54 +0300 Subject: [PATCH 05/11] traces ingestion V2 --- exporter/qrynexporter/README.md | 1 + exporter/qrynexporter/config.go | 2 + exporter/qrynexporter/schema.go | 58 ++++++++++++++++ exporter/qrynexporter/traces.go | 27 +++++++- exporter/qrynexporter/traces_v2.go | 102 +++++++++++++++++++++++++++++ 5 files changed, 189 insertions(+), 1 deletion(-) create mode 100644 exporter/qrynexporter/traces_v2.go diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 7daa99c..35527da 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -11,6 +11,7 @@ - `dsn` (required): Clickhouse's dsn. - `clustered_clickhouse` (required): true if clickhouse cluster is used +- `traces_distributed_export_v2`: use improved ingestion algorythm for traces. Data ingestion is sess performant but more evenly distributed # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index c71651b..4075e71 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,8 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` + TracesDitstibutedExportV2 bool `mapstructure:"traces_distributed_export_v2"` + ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` // DSN is the ClickHouse server Data Source Name. diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index fed1546..34dce51 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -57,6 +57,40 @@ var ( } ) +func TracesV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces%s ( + oid, + trace_id, + span_id, + parent_id, + name, + timestamp_ns, + duration_ns, + service_name, + payload_type, + payload)`, dist) +} + +func TracesTagsV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s ( + oid, + date, + key, + val, + trace_id, + span_id, + timestamp_ns, + duration)`, dist) +} + // Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js // We need to align with the schema here. // @@ -90,6 +124,30 @@ type Trace struct { Tags [][]string `ch:"tags"` } +type TraceV2 struct { + OID string `ch:"oid"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + ParentID []byte `ch:"parent_id"` + Name string `ch:"name"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration_ns"` + ServiceName string `ch:"service_name"` + PayloadType int8 `ch:"payload_type"` + Payload string `ch:"payload"` +} + +type TraceTagsV2 struct { + OID string `ch:"oid"` + Date time.Time `ch:"date"` + Key string `ch:"key"` + Val string `ch:"val"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration"` +} + // Sample represent sample data // `CREATE TABLE IF NOT EXISTS samples_v3 // ( diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index 86dec6d..e33979c 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -50,6 +50,7 @@ type tracesExporter struct { db clickhouse.Conn cluster bool + v2 bool } // newTracesExporter returns a SpanWriter for the database @@ -67,6 +68,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, + v2: cfg.ClusteredClickhouse && cfg.TracesDitstibutedExportV2, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -164,7 +166,14 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { isCluster := ctx.Value("cluster").(bool) - batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + var batch driver.Batch + var err error + if e.v2 { + batch, err = e.prepareBatchV2(ctx) + } else { + batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + } + if err != nil { return err } @@ -187,6 +196,22 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } +func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) { + batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) + if err != nil { + return nil, err + } + subBatch, err := e.db.PrepareBatch(ctx, TracesTagsV2InputSQL(e.cluster)) + if err != nil { + batch.Abort() + return nil, err + } + return &batchV2{ + Batch: batch, + subBatch: subBatch, + }, nil +} + // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { _ctx := context.WithValue(ctx, "cluster", e.cluster) diff --git a/exporter/qrynexporter/traces_v2.go b/exporter/qrynexporter/traces_v2.go new file mode 100644 index 0000000..37da8a9 --- /dev/null +++ b/exporter/qrynexporter/traces_v2.go @@ -0,0 +1,102 @@ +package qrynexporter + +import ( + "encoding/hex" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "time" +) + +type batchV2 struct { + driver.Batch + subBatch driver.Batch +} + +func (b *batchV2) AppendStruct(data any) error { + _data, ok := data.(*Trace) + if !ok { + return fmt.Errorf("invalid data type, expected *Trace, got %T", data) + } + binTraceId, err := unhexAndPad(_data.TraceID, 16) + if err != nil { + return err + } + binParentID, err := unhexAndPad(_data.ParentID, 8) + if err != nil { + return err + } + binSpanID, err := unhexAndPad(_data.SpanID, 8) + if err != nil { + return err + } + trace := &TraceV2{ + OID: "0", + TraceID: binTraceId, + SpanID: binSpanID, + ParentID: binParentID, + Name: _data.Name, + TimestampNs: _data.TimestampNs, + DurationNs: _data.DurationNs, + ServiceName: _data.ServiceName, + PayloadType: _data.PayloadType, + Payload: _data.Payload, + } + err = b.Batch.AppendStruct(trace) + if err != nil { + return err + } + for _, tag := range _data.Tags { + attr := &TraceTagsV2{ + OID: "0", + Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), + Key: tag[0], + Val: tag[1], + TraceID: binTraceId, + SpanID: binSpanID, + TimestampNs: _data.TimestampNs, + DurationNs: _data.DurationNs, + } + err = b.subBatch.AppendStruct(attr) + if err != nil { + return err + } + } + return nil +} + +func (b *batchV2) Abort() error { + var errs [2]error + errs[0] = b.Batch.Abort() + errs[1] = b.subBatch.Abort() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func (b *batchV2) Send() error { + var errs [2]error + errs[0] = b.Batch.Send() + errs[1] = b.subBatch.Send() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func unhexAndPad(s string, size int) ([]byte, error) { + bStr, err := hex.DecodeString(s) + if err != nil { + return nil, err + } + if len(bStr) < size { + res := make([]byte, size) + copy(res[size-len(bStr):], bStr) + return res, nil + } + return bStr[size-len(bStr):], nil +} From 7360df401544c00edad7a1444f330f1c34680cb3 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 21 Oct 2024 17:08:00 +0300 Subject: [PATCH 06/11] config rename --- exporter/qrynexporter/README.md | 3 ++- exporter/qrynexporter/config.go | 2 +- exporter/qrynexporter/traces.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 35527da..b2b0b4f 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -11,7 +11,8 @@ - `dsn` (required): Clickhouse's dsn. - `clustered_clickhouse` (required): true if clickhouse cluster is used -- `traces_distributed_export_v2`: use improved ingestion algorythm for traces. Data ingestion is sess performant but more evenly distributed +- `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. +Data ingestion is sess performant but more evenly distributed # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index 4075e71..b025e7a 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,7 +30,7 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` - TracesDitstibutedExportV2 bool `mapstructure:"traces_distributed_export_v2"` + ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index e33979c..b1e71f2 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -68,7 +68,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, - v2: cfg.ClusteredClickhouse && cfg.TracesDitstibutedExportV2, + v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) From fd0f14fffc67fcbf2b7aae7a439072f92089fccd Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 21 Oct 2024 17:17:45 +0300 Subject: [PATCH 07/11] specify metrics to exclude labels from --- receiver/pyroscopereceiver/README.md | 10 +++++++++- receiver/pyroscopereceiver/config.go | 11 ++++++++--- receiver/pyroscopereceiver/receiver.go | 25 +++++++++++++++++-------- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/receiver/pyroscopereceiver/README.md b/receiver/pyroscopereceiver/README.md index 94fb165..2ab4ffc 100644 --- a/receiver/pyroscopereceiver/README.md +++ b/receiver/pyroscopereceiver/README.md @@ -13,7 +13,8 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op - `timeout`: sets the server reponse timeout. Default is 10 seconds. - `metrics`: configures the metrics collection for the Pyroscope receiver. - `enable`: enables or disables metrics collection. Default is true. - - `exclude_labels`: a list of label names to exclude from the metrics. Available labels are: + - `exclude_labels`: a list of metrics and label names to exclude from the metrics. + Available metrics are listed further. Metric name may be empty. Available labels are: - `service`: name of the service provided the pprof request - `type`: type of pprof request (jfr or pprof) - `encoding`: not used, empty @@ -29,6 +30,13 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op ```yaml receivers: pyroscopereceiver: + metrics: + enable: true + exclude_labels: + - metric: request_body_uncompressed_size_bytes + label: service + exclude_metrics: + - http_requests_total protocols: http: endpoint: 0.0.0.0:8062 diff --git a/receiver/pyroscopereceiver/config.go b/receiver/pyroscopereceiver/config.go index 7cad55f..86fecba 100644 --- a/receiver/pyroscopereceiver/config.go +++ b/receiver/pyroscopereceiver/config.go @@ -14,10 +14,15 @@ type Protocols struct { HTTP *confighttp.ServerConfig `mapstructure:"http"` } +type ExcludeLabel struct { + Metric string `mapstructure:"metric"` + Label string `mapstructure:"label"` +} + type MetricsConfig struct { - Enable bool `mapstructure:"enable" default:"true"` - ExcludeLabels []string `mapstructure:"exclude_labels"` - ExcludeMetrics []string `mapstructure:"exclude_metrics"` + Enable bool `mapstructure:"enable" default:"true"` + ExcludeLabels []ExcludeLabel `mapstructure:"exclude_labels"` + ExcludeMetrics []string `mapstructure:"exclude_metrics"` } // Represents the receiver config within the collector's config.yaml diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 6688939..d081d20 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -180,7 +180,8 @@ func (r *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter if otelcolReceiverPyroscopeHttpRequestTotal != nil { otelcolReceiverPyroscopeHttpRequestTotal.Add( ctx, 1, - metric.WithAttributeSet(*r.newOtelcolAttrSetHttp(pm.name, errorCodeSuccess, http.StatusNoContent)), + metric.WithAttributeSet(*r.newOtelcolAttrSetHttp( + "http_request_total", pm.name, errorCodeSuccess, http.StatusNoContent)), ) } writeResponseNoContent(resp) @@ -191,7 +192,8 @@ func (r *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter func (recv *pyroscopeReceiver) handleError(ctx context.Context, resp http.ResponseWriter, contentType string, statusCode int, msg string, service string, errorCode string) { if otelcolReceiverPyroscopeHttpRequestTotal != nil { otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, - metric.WithAttributeSet(*recv.newOtelcolAttrSetHttp(service, errorCode, statusCode))) + metric.WithAttributeSet(*recv.newOtelcolAttrSetHttp( + "http_request_total", service, errorCode, statusCode))) } recv.logger.Error(msg) @@ -249,14 +251,17 @@ func readParams(qs *url.Values) (params, error) { return p, nil } -func (r *pyroscopeReceiver) newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *attribute.Set { +func (r *pyroscopeReceiver) newOtelcolAttrSetHttp(metric string, service string, errorCode string, + statusCode int) *attribute.Set { keyValues := []attribute.KeyValue{ {Key: keyService, Value: attribute.StringValue(service)}, {Key: "error_code", Value: attribute.StringValue(errorCode)}, {Key: "status_code", Value: attribute.IntValue(statusCode)}, } for i := len(keyValues) - 1; i >= 0; i-- { - if slices.Contains(r.cfg.Metrics.ExcludeLabels, string(keyValues[i].Key)) { + if slices.ContainsFunc(r.cfg.Metrics.ExcludeLabels, func(label ExcludeLabel) bool { + return (label.Metric == metric || label.Metric == "") && label.Label == string(keyValues[i].Key) + }) { keyValues[i] = keyValues[len(keyValues)-1] keyValues = keyValues[:len(keyValues)-1] } @@ -378,7 +383,8 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, // TODO: try measure compressed size if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes != nil { otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), - metric.WithAttributeSet(*r.newOtelcolAttrSetPayloadSizeBytes(pm.name, format, ""))) + metric.WithAttributeSet(*r.newOtelcolAttrSetPayloadSizeBytes("request_body_uncompressed_size_bytes", + pm.name, format, ""))) } resetHeaders(req) @@ -445,7 +451,8 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, // sz may be 0 and it will be recorded if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes != nil { otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), - metric.WithAttributeSet(*r.newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, ""))) + metric.WithAttributeSet(*r.newOtelcolAttrSetPayloadSizeBytes( + "parsed_body_uncompressed_size_bytes", pm.name, formatPprof, ""))) } return logs, nil @@ -455,7 +462,7 @@ func ns(sec uint64) uint64 { return sec * 1e9 } -func (r *pyroscopeReceiver) newOtelcolAttrSetPayloadSizeBytes(service string, typ string, +func (r *pyroscopeReceiver) newOtelcolAttrSetPayloadSizeBytes(metric string, service string, typ string, encoding string) *attribute.Set { keyValues := []attribute.KeyValue{ {Key: keyService, Value: attribute.StringValue(service)}, @@ -463,7 +470,9 @@ func (r *pyroscopeReceiver) newOtelcolAttrSetPayloadSizeBytes(service string, ty {Key: "encoding", Value: attribute.StringValue(encoding)}, } for i := len(keyValues) - 1; i >= 0; i-- { - if slices.Contains(r.cfg.Metrics.ExcludeLabels, string(keyValues[i].Key)) { + if slices.ContainsFunc(r.cfg.Metrics.ExcludeLabels, func(label ExcludeLabel) bool { + return (label.Metric == metric || label.Metric == "") && label.Label == string(keyValues[i].Key) + }) { keyValues[i] = keyValues[len(keyValues)-1] keyValues = keyValues[:len(keyValues)-1] } From 45fdef0d5f82108f28e44cfa11360d10c2bc4e35 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:14:43 +0800 Subject: [PATCH 08/11] refactor: improve the code structure and documentation of qrynexporter. Rename the batchV2 struct to traceWithTagsBatch to improve code readability. Update the names of struct fields to make them more descriptive. Rename the traces_v2.go file to trace_batch_processor.go. Use a custom contextKey type in the pushTraceData function to resolve the SA1029 warning. Optimize README.md to provide more detailed configuration instructions. These changes are aimed at improving code quality, maintainability, and documentation clarity. --- exporter/qrynexporter/README.md | 14 +++++ exporter/qrynexporter/config.go | 4 ++ exporter/qrynexporter/logs.go | 2 +- exporter/qrynexporter/metrics.go | 2 +- exporter/qrynexporter/schema.go | 10 ++-- ...{traces_v2.go => trace_batch_processor.go} | 51 ++++++++++--------- exporter/qrynexporter/traces.go | 39 +++++++++----- 7 files changed, 76 insertions(+), 46 deletions(-) rename exporter/qrynexporter/{traces_v2.go => trace_batch_processor.go} (61%) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index b2b0b4f..ce0a325 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -8,11 +8,25 @@ # Configuration options: +- `dsn` (required): Data Source Name for Clickhouse. + - Example: `tcp://localhost:9000/?database=cloki` +- `clustered_clickhouse` (required): + - Type: boolean + - Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`. + +- `client_side_trace_processing` (required): + - Type: boolean + - Default: `true` + - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. + +<<<<<<< HEAD - `dsn` (required): Clickhouse's dsn. - `clustered_clickhouse` (required): true if clickhouse cluster is used - `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. Data ingestion is sess performant but more evenly distributed +======= +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index b025e7a..176c28c 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,10 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` +<<<<<<< HEAD +======= + // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index cc8eafc..7791965 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index c2f0fd4..d1c4618 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index 34dce51..c84c745 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -20,7 +20,7 @@ import ( ) var ( - tracesInputSQL = func(clustered bool) string { + tracesInputSQL = func(_ bool) string { return `INSERT INTO traces_input ( trace_id, span_id, @@ -110,8 +110,8 @@ func TracesTagsV2InputSQL(clustered bool) string { // // ) Engine=Null -// Trace represent trace model -type Trace struct { +// TraceInput represent trace model +type TraceInput struct { TraceID string `ch:"trace_id"` SpanID string `ch:"span_id"` ParentID string `ch:"parent_id"` @@ -124,7 +124,7 @@ type Trace struct { Tags [][]string `ch:"tags"` } -type TraceV2 struct { +type TempoTrace struct { OID string `ch:"oid"` TraceID []byte `ch:"trace_id"` SpanID []byte `ch:"span_id"` @@ -137,7 +137,7 @@ type TraceV2 struct { Payload string `ch:"payload"` } -type TraceTagsV2 struct { +type TempoTraceTag struct { OID string `ch:"oid"` Date time.Time `ch:"date"` Key string `ch:"key"` diff --git a/exporter/qrynexporter/traces_v2.go b/exporter/qrynexporter/trace_batch_processor.go similarity index 61% rename from exporter/qrynexporter/traces_v2.go rename to exporter/qrynexporter/trace_batch_processor.go index 37da8a9..6fcde08 100644 --- a/exporter/qrynexporter/traces_v2.go +++ b/exporter/qrynexporter/trace_batch_processor.go @@ -3,60 +3,61 @@ package qrynexporter import ( "encoding/hex" "fmt" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) -type batchV2 struct { +type traceWithTagsBatch struct { driver.Batch - subBatch driver.Batch + tagsBatch driver.Batch } -func (b *batchV2) AppendStruct(data any) error { - _data, ok := data.(*Trace) +func (b *traceWithTagsBatch) AppendStruct(v any) error { + ti, ok := v.(*TraceInput) if !ok { - return fmt.Errorf("invalid data type, expected *Trace, got %T", data) + return fmt.Errorf("invalid data type, expected *Trace, got %T", v) } - binTraceId, err := unhexAndPad(_data.TraceID, 16) + binTraceId, err := unhexAndPad(ti.TraceID, 16) if err != nil { return err } - binParentID, err := unhexAndPad(_data.ParentID, 8) + binParentID, err := unhexAndPad(ti.ParentID, 8) if err != nil { return err } - binSpanID, err := unhexAndPad(_data.SpanID, 8) + binSpanID, err := unhexAndPad(ti.SpanID, 8) if err != nil { return err } - trace := &TraceV2{ + trace := &TempoTrace{ OID: "0", TraceID: binTraceId, SpanID: binSpanID, ParentID: binParentID, - Name: _data.Name, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, - ServiceName: _data.ServiceName, - PayloadType: _data.PayloadType, - Payload: _data.Payload, + Name: ti.Name, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + ServiceName: ti.ServiceName, + PayloadType: ti.PayloadType, + Payload: ti.Payload, } err = b.Batch.AppendStruct(trace) if err != nil { return err } - for _, tag := range _data.Tags { - attr := &TraceTagsV2{ + for _, tag := range ti.Tags { + attr := &TempoTraceTag{ OID: "0", Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), Key: tag[0], Val: tag[1], TraceID: binTraceId, SpanID: binSpanID, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, } - err = b.subBatch.AppendStruct(attr) + err = b.tagsBatch.AppendStruct(attr) if err != nil { return err } @@ -64,10 +65,10 @@ func (b *batchV2) AppendStruct(data any) error { return nil } -func (b *batchV2) Abort() error { +func (b *traceWithTagsBatch) Abort() error { var errs [2]error errs[0] = b.Batch.Abort() - errs[1] = b.subBatch.Abort() + errs[1] = b.tagsBatch.Abort() for _, err := range errs { if err != nil { return err @@ -76,10 +77,10 @@ func (b *batchV2) Abort() error { return nil } -func (b *batchV2) Send() error { +func (b *traceWithTagsBatch) Send() error { var errs [2]error errs[0] = b.Batch.Send() - errs[1] = b.subBatch.Send() + errs[1] = b.tagsBatch.Send() for _, err := range errs { if err != nil { return err diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index b1e71f2..a63e27e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -34,8 +34,11 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +type contextKey string + const ( - spanLinkDataFormat = "%s|%s|%s|%s|%d" + spanLinkDataFormat = "%s|%s|%s|%s|%d" + clusterKey contextKey = "cluster" ) var delegate = &protojson.MarshalOptions{ @@ -48,9 +51,9 @@ type tracesExporter struct { logger *zap.Logger meter metric.Meter - db clickhouse.Conn - cluster bool - v2 bool + db clickhouse.Conn + cluster bool + clientSide bool } // newTracesExporter returns a SpanWriter for the database @@ -64,11 +67,19 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ +<<<<<<< HEAD logger: logger, meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, +======= + logger: logger, + meter: set.MeterProvider.Meter(typeStr), + db: db, + cluster: cfg.ClusteredClickhouse, + clientSide: cfg.ClientSideTraceProcessing, +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -165,11 +176,11 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { } func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { - isCluster := ctx.Value("cluster").(bool) + isCluster := ctx.Value(clusterKey).(bool) var batch driver.Batch var err error - if e.v2 { - batch, err = e.prepareBatchV2(ctx) + if e.clientSide { + batch, err = e.prepareBatchClientSide(ctx) } else { batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) } @@ -196,7 +207,7 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } -func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) { +func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) { batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) if err != nil { return nil, err @@ -206,15 +217,15 @@ func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, erro batch.Abort() return nil, err } - return &batchV2{ - Batch: batch, - subBatch: subBatch, + return &traceWithTagsBatch{ + Batch: batch, + tagsBatch: subBatch, }, nil } // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { - _ctx := context.WithValue(ctx, "cluster", e.cluster) + _ctx := context.WithValue(ctx, clusterKey, e.cluster) start := time.Now() if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces))) @@ -387,7 +398,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte, return delegate.Marshal(otlpSpan) } -func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) { +func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) { durationNano := uint64(span.EndTimestamp() - span.StartTimestamp()) tags = aggregateSpanTags(span, tags) tags["service.name"] = serviceName @@ -404,7 +415,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName return nil, fmt.Errorf("failed to marshal span: %w", err) } - trace := &Trace{ + trace := &TraceInput{ TraceID: span.TraceID().String(), SpanID: span.SpanID().String(), ParentID: span.ParentSpanID().String(), From e4bf5c0a03d6ae554ed6c6a90eff96a13d1270b7 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:31:46 +0800 Subject: [PATCH 09/11] fix: rebase --- exporter/qrynexporter/README.md | 7 ------- exporter/qrynexporter/config.go | 3 --- exporter/qrynexporter/traces.go | 8 -------- 3 files changed, 18 deletions(-) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index ce0a325..a844bb0 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -20,13 +20,6 @@ - Default: `true` - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. -<<<<<<< HEAD -- `dsn` (required): Clickhouse's dsn. -- `clustered_clickhouse` (required): true if clickhouse cluster is used -- `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. -Data ingestion is sess performant but more evenly distributed -======= ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index 176c28c..d0e2477 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,10 +30,7 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` -<<<<<<< HEAD -======= // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index a63e27e..fd56330 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -67,19 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ -<<<<<<< HEAD - logger: logger, - meter: set.MeterProvider.Meter(typeStr), - db: db, - cluster: cfg.ClusteredClickhouse, - v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, -======= logger: logger, meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, clientSide: cfg.ClientSideTraceProcessing, ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) From a4911bc8dd0fd4506749efed315462d5d1017a28 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:33:47 +0800 Subject: [PATCH 10/11] docs: fix --- exporter/qrynexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index a844bb0..f22c2c9 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -9,7 +9,7 @@ # Configuration options: - `dsn` (required): Data Source Name for Clickhouse. - - Example: `tcp://localhost:9000/?database=cloki` + - Example: `tcp://localhost:9000/qryn` - `clustered_clickhouse` (required): - Type: boolean From cb44a9995f8cd0cbf529ed5cc211d0d218d3804c Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:37:37 +0800 Subject: [PATCH 11/11] feat: need cfg.ClusteredClickhouse --- exporter/qrynexporter/traces.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index fd56330..dfe6d7e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -71,7 +71,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, - clientSide: cfg.ClientSideTraceProcessing, + clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))