-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add dependency processor using Apache Beam #6560
Open
yunmaoQu
wants to merge
1
commit into
jaegertracing:main
Choose a base branch
from
yunmaoQu:add-dependency-processor
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+336
−17
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
205 changes: 205 additions & 0 deletions
205
cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
// Copyright (c) 2025 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package dependencyprocessor | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/apache/beam/sdks/v2/go/pkg/beam" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" | ||
"go.opentelemetry.io/collector/component" | ||
"go.uber.org/zap" | ||
|
||
"github.com/jaegertracing/jaeger/model" | ||
"github.com/jaegertracing/jaeger/plugin/storage/memory" | ||
) | ||
|
||
type dependencyAggregator struct { | ||
config *Config | ||
telset component.TelemetrySettings | ||
dependencyWriter *memory.Store | ||
traces map[model.TraceID]*TraceState | ||
tracesLock sync.RWMutex | ||
closeChan chan struct{} | ||
beamPipeline *beam.Pipeline | ||
beamScope beam.Scope | ||
} | ||
|
||
type TraceState struct { | ||
spans []*model.Span | ||
spanMap map[model.SpanID]*model.Span | ||
lastUpdateTime time.Time | ||
timer *time.Timer | ||
} | ||
|
||
func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator { | ||
beam.Init() | ||
p, s := beam.NewPipelineWithRoot() | ||
return &dependencyAggregator{ | ||
config: &cfg, | ||
telset: telset, | ||
dependencyWriter: dependencyWriter, | ||
traces: make(map[model.TraceID]*TraceState), | ||
beamPipeline: p, | ||
beamScope: s, | ||
} | ||
} | ||
|
||
func (agg *dependencyAggregator) Start(closeChan chan struct{}) { | ||
agg.closeChan = closeChan | ||
go func() { | ||
ticker := time.NewTicker(agg.config.AggregationInterval) | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ticker.C: | ||
agg.processTraces(context.Background()) // Pass context | ||
case <-agg.closeChan: | ||
agg.processTraces(context.Background()) // Pass context | ||
return | ||
} | ||
} | ||
}() | ||
} | ||
|
||
func (agg *dependencyAggregator) Close() error { | ||
agg.tracesLock.Lock() | ||
defer agg.tracesLock.Unlock() | ||
for _, traceState := range agg.traces { | ||
if traceState.timer != nil { | ||
traceState.timer.Stop() | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (agg *dependencyAggregator) HandleSpan(span *model.Span) { | ||
agg.tracesLock.Lock() | ||
defer agg.tracesLock.Unlock() | ||
|
||
traceState, ok := agg.traces[span.TraceID] | ||
if !ok { | ||
traceState = &TraceState{ | ||
spans: []*model.Span{}, | ||
spanMap: make(map[model.SpanID]*model.Span), | ||
lastUpdateTime: time.Now(), | ||
} | ||
agg.traces[span.TraceID] = traceState | ||
} | ||
|
||
traceState.spans = append(traceState.spans, span) | ||
traceState.spanMap[span.SpanID] = span | ||
traceState.lastUpdateTime = time.Now() | ||
|
||
if traceState.timer != nil { | ||
traceState.timer.Stop() | ||
} | ||
traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() { | ||
agg.processTraces(context.Background()) // Pass context | ||
}) | ||
} | ||
|
||
func (agg *dependencyAggregator) processTraces(ctx context.Context) { | ||
agg.tracesLock.Lock() | ||
if len(agg.traces) == 0 { | ||
agg.tracesLock.Unlock() | ||
return | ||
} | ||
traces := agg.traces | ||
agg.traces = make(map[model.TraceID]*TraceState) | ||
agg.tracesLock.Unlock() | ||
for _, traceState := range traces { | ||
if traceState.timer != nil { | ||
traceState.timer.Stop() | ||
} | ||
} | ||
|
||
beamInput := agg.createBeamInput(traces) | ||
if beamInput.IsValid() { | ||
agg.calculateDependencies(ctx, beamInput) | ||
} | ||
} | ||
|
||
func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection { | ||
var allSpans []*model.Span | ||
for _, traceState := range traces { | ||
allSpans = append(allSpans, traceState.spans...) | ||
} | ||
if len(allSpans) == 0 { | ||
return beam.PCollection{} | ||
} | ||
return beam.CreateList(agg.beamScope, allSpans) | ||
} | ||
|
||
func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) { | ||
keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) { | ||
return s.TraceID, s | ||
}, input) | ||
|
||
groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans) | ||
depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink { | ||
deps := map[string]*model.DependencyLink{} | ||
var span *model.Span | ||
for spansIter(span) { | ||
processSpan(deps, span, agg.traces) | ||
} | ||
return depMapToSlice(deps) | ||
}, groupedSpans) | ||
flattened := beam.Flatten(agg.beamScope, depLinks) | ||
|
||
beam.ParDo0(agg.beamScope, func(deps []*model.DependencyLink) { | ||
err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps) | ||
if err != nil { | ||
agg.telset.Logger.Error("Error writing dependencies", zap.Error(err)) | ||
} | ||
}, flattened) | ||
|
||
go func() { | ||
err := beamx.Run(ctx, agg.beamPipeline) | ||
if err != nil { | ||
agg.telset.Logger.Error("Error running beam pipeline", zap.Error(err)) | ||
} | ||
agg.beamPipeline = beam.NewPipeline() | ||
agg.beamScope = beam.Scope{Parent: beam.PipelineScope{ID: "dependency"}, Name: "dependency"} | ||
}() | ||
} | ||
|
||
// processSpan is a copy from the memory storage plugin | ||
func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) { | ||
parentSpan := seekToSpan(s, traces) | ||
if parentSpan != nil { | ||
if parentSpan.Process.ServiceName == s.Process.ServiceName { | ||
return | ||
} | ||
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName | ||
if _, ok := deps[depKey]; !ok { | ||
deps[depKey] = &model.DependencyLink{ | ||
Parent: parentSpan.Process.ServiceName, | ||
Child: s.Process.ServiceName, | ||
CallCount: 1, | ||
} | ||
} else { | ||
deps[depKey].CallCount++ | ||
} | ||
} | ||
} | ||
|
||
func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span { | ||
traceState, ok := traces[span.TraceID] | ||
if !ok { | ||
return nil | ||
} | ||
return traceState.spanMap[span.ParentSpanID()] | ||
} | ||
|
||
// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin | ||
func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink { | ||
retMe := make([]*model.DependencyLink, 0, len(deps)) | ||
for _, dep := range deps { | ||
retMe = append(retMe, dep) | ||
} | ||
return retMe | ||
} |
15 changes: 15 additions & 0 deletions
15
cmd/jaeger/internal/processors/dependencyprocessor/config.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package dependencyprocessor | ||
|
||
import "time" | ||
|
||
type Config struct { | ||
AggregationInterval time.Duration `yaml:"aggregation_interval"` | ||
InactivityTimeout time.Duration `yaml:"inactivity_timeout"` | ||
} | ||
|
||
func DefaultConfig() Config { | ||
return Config{ | ||
AggregationInterval: 5 * time.Second, // 默认每5秒聚合一次依赖 | ||
InactivityTimeout: 2 * time.Second, // 默认trace不活跃2秒后视为完成 | ||
} | ||
} |
61 changes: 61 additions & 0 deletions
61
cmd/jaeger/internal/processors/dependencyprocessor/processor.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
// Copyright (c) 2025 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package dependencyprocessor | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/pdata/ptrace" | ||
|
||
"github.com/jaegertracing/jaeger/plugin/storage/memory" | ||
"github.com/jaegertracing/jaeger/storage_v2/v1adapter" | ||
) | ||
|
||
type dependencyProcessor struct { | ||
config *Config | ||
aggregator *dependencyAggregator // Define the aggregator below. | ||
telset component.TelemetrySettings | ||
dependencyWriter *memory.Store | ||
closeChan chan struct{} | ||
} | ||
|
||
func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyProcessor { | ||
return &dependencyProcessor{ | ||
config: &cfg, | ||
telset: telset, | ||
dependencyWriter: dependencyWriter, | ||
closeChan: make(chan struct{}), | ||
} | ||
} | ||
|
||
func (tp *dependencyProcessor) start(_ context.Context, host component.Host) error { | ||
tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter) | ||
tp.aggregator.Start(tp.closeChan) | ||
return nil | ||
} | ||
|
||
func (tp *dependencyProcessor) close(ctx context.Context) error { | ||
close(tp.closeChan) | ||
if tp.aggregator != nil { | ||
if err := tp.aggregator.Close(); err != nil { | ||
return fmt.Errorf("failed to stop the dependency aggregator : %w", err) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (tp *dependencyProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { | ||
batches := v1adapter.ProtoFromTraces(td) | ||
for _, batch := range batches { | ||
for _, span := range batch.Spans { | ||
if span.Process == nil { | ||
span.Process = batch.Process | ||
} | ||
tp.aggregator.HandleSpan(span) | ||
} | ||
} | ||
return td, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,20 +24,22 @@ type Store struct { | |
sync.RWMutex | ||
// Each tenant gets a copy of default config. | ||
// In the future this can be extended to contain per-tenant configuration. | ||
defaultConfig Configuration | ||
perTenant map[string]*Tenant | ||
defaultConfig Configuration | ||
perTenant map[string]*Tenant | ||
useNewDependencies bool | ||
} | ||
|
||
// Tenant is an in-memory store of traces for a single tenant | ||
type Tenant struct { | ||
sync.RWMutex | ||
ids []*model.TraceID | ||
traces map[model.TraceID]*model.Trace | ||
services map[string]struct{} | ||
operations map[string]map[spanstore.Operation]struct{} | ||
deduper adjuster.Adjuster | ||
config Configuration | ||
index int | ||
ids []*model.TraceID | ||
traces map[model.TraceID]*model.Trace | ||
services map[string]struct{} | ||
operations map[string]map[spanstore.Operation]struct{} | ||
deduper adjuster.Adjuster | ||
config Configuration | ||
index int | ||
dependencyLinks map[string]*model.DependencyLink | ||
} | ||
|
||
// NewStore creates an unbounded in-memory store | ||
|
@@ -48,19 +50,22 @@ func NewStore() *Store { | |
// WithConfiguration creates a new in memory storage based on the given configuration | ||
func WithConfiguration(cfg Configuration) *Store { | ||
return &Store{ | ||
defaultConfig: cfg, | ||
perTenant: make(map[string]*Tenant), | ||
defaultConfig: cfg, | ||
perTenant: make(map[string]*Tenant), | ||
useNewDependencies: false, // 添加初始化 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use English in comments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
|
||
} | ||
} | ||
|
||
func newTenant(cfg Configuration) *Tenant { | ||
return &Tenant{ | ||
ids: make([]*model.TraceID, cfg.MaxTraces), | ||
traces: map[model.TraceID]*model.Trace{}, | ||
services: map[string]struct{}{}, | ||
operations: map[string]map[spanstore.Operation]struct{}{}, | ||
deduper: adjuster.ZipkinSpanIDUniquifier(), | ||
config: cfg, | ||
ids: make([]*model.TraceID, cfg.MaxTraces), | ||
traces: map[model.TraceID]*model.Trace{}, | ||
services: map[string]struct{}{}, | ||
operations: map[string]map[spanstore.Operation]struct{}{}, | ||
deduper: adjuster.ZipkinSpanIDUniquifier(), | ||
config: cfg, | ||
dependencyLinks: make(map[string]*model.DependencyLink), | ||
} | ||
} | ||
|
||
|
@@ -83,6 +88,9 @@ func (st *Store) getTenant(tenantID string) *Tenant { | |
|
||
// GetDependencies returns dependencies between services | ||
func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { | ||
if st.useNewDependencies { // 添加条件判断 | ||
return st.getDependenciesNew(ctx) | ||
} | ||
m := st.getTenant(tenancy.GetTenant(ctx)) | ||
// deduper used below can modify the spans, so we take an exclusive lock | ||
m.Lock() | ||
|
@@ -119,6 +127,36 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback | |
return retMe, nil | ||
} | ||
|
||
func (st *Store) getDependenciesNew(ctx context.Context) ([]model.DependencyLink, error) { | ||
m := st.getTenant(tenancy.GetTenant(ctx)) | ||
m.RLock() | ||
defer m.RUnlock() | ||
retMe := make([]model.DependencyLink, 0, len(m.dependencyLinks)) | ||
for _, dep := range m.dependencyLinks { | ||
retMe = append(retMe, *dep) | ||
} | ||
return retMe, nil | ||
} | ||
|
||
func (st *Store) WriteDependencies(ctx context.Context, ts time.Time, dependencies []*model.DependencyLink) error { | ||
m := st.getTenant(tenancy.GetTenant(ctx)) | ||
m.Lock() | ||
defer m.Unlock() | ||
for _, dep := range dependencies { | ||
key := dep.Parent + "&&&" + dep.Child | ||
if _, ok := m.dependencyLinks[key]; !ok { | ||
m.dependencyLinks[key] = &model.DependencyLink{ | ||
Parent: dep.Parent, | ||
Child: dep.Child, | ||
CallCount: dep.CallCount, | ||
} | ||
} else { | ||
m.dependencyLinks[key].CallCount += dep.CallCount | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span { | ||
for _, s := range trace.Spans { | ||
if s.SpanID == spanID { | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is placed under /processors/ is needs to follow the OTEL Processor framework patterns, like having a factory, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok