Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dependency processor using Apache Beam #6560

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is placed under /processors/ is needs to follow the OTEL Processor framework patterns, like having a factory, etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


import (
"context"
"sync"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

type dependencyAggregator struct {
config *Config
telset component.TelemetrySettings
dependencyWriter *memory.Store
traces map[model.TraceID]*TraceState
tracesLock sync.RWMutex
closeChan chan struct{}
beamPipeline *beam.Pipeline
beamScope beam.Scope
}

type TraceState struct {
spans []*model.Span
spanMap map[model.SpanID]*model.Span
lastUpdateTime time.Time
timer *time.Timer
}

func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator {
beam.Init()
p, s := beam.NewPipelineWithRoot()
return &dependencyAggregator{
config: &cfg,
telset: telset,
dependencyWriter: dependencyWriter,
traces: make(map[model.TraceID]*TraceState),
beamPipeline: p,
beamScope: s,
}
}

func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
agg.closeChan = closeChan
go func() {
ticker := time.NewTicker(agg.config.AggregationInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
agg.processTraces(context.Background()) // Pass context
case <-agg.closeChan:
agg.processTraces(context.Background()) // Pass context
return
}
}
}()
}

func (agg *dependencyAggregator) Close() error {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()
for _, traceState := range agg.traces {
if traceState.timer != nil {
traceState.timer.Stop()
}
}
return nil
}

func (agg *dependencyAggregator) HandleSpan(span *model.Span) {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()

traceState, ok := agg.traces[span.TraceID]
if !ok {
traceState = &TraceState{
spans: []*model.Span{},
spanMap: make(map[model.SpanID]*model.Span),
lastUpdateTime: time.Now(),
}
agg.traces[span.TraceID] = traceState
}

traceState.spans = append(traceState.spans, span)
traceState.spanMap[span.SpanID] = span
traceState.lastUpdateTime = time.Now()

if traceState.timer != nil {
traceState.timer.Stop()
}
traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() {
agg.processTraces(context.Background()) // Pass context
})
}

func (agg *dependencyAggregator) processTraces(ctx context.Context) {
agg.tracesLock.Lock()
if len(agg.traces) == 0 {
agg.tracesLock.Unlock()
return
}
traces := agg.traces
agg.traces = make(map[model.TraceID]*TraceState)
agg.tracesLock.Unlock()
for _, traceState := range traces {
if traceState.timer != nil {
traceState.timer.Stop()
}
}

beamInput := agg.createBeamInput(traces)
if beamInput.IsValid() {
agg.calculateDependencies(ctx, beamInput)
}
}

func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection {
var allSpans []*model.Span
for _, traceState := range traces {
allSpans = append(allSpans, traceState.spans...)
}
if len(allSpans) == 0 {
return beam.PCollection{}
}
return beam.CreateList(agg.beamScope, allSpans)
}

func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) {
keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) {
return s.TraceID, s
}, input)

groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans)
depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink {
deps := map[string]*model.DependencyLink{}
var span *model.Span
for spansIter(span) {
processSpan(deps, span, agg.traces)
}
return depMapToSlice(deps)
}, groupedSpans)
flattened := beam.Flatten(agg.beamScope, depLinks)

beam.ParDo0(agg.beamScope, func(deps []*model.DependencyLink) {
err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps)
if err != nil {
agg.telset.Logger.Error("Error writing dependencies", zap.Error(err))
}
}, flattened)

go func() {
err := beamx.Run(ctx, agg.beamPipeline)
if err != nil {
agg.telset.Logger.Error("Error running beam pipeline", zap.Error(err))
}
agg.beamPipeline = beam.NewPipeline()
agg.beamScope = beam.Scope{Parent: beam.PipelineScope{ID: "dependency"}, Name: "dependency"}
}()
}

// processSpan is a copy from the memory storage plugin
func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) {
parentSpan := seekToSpan(s, traces)
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
return
}
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentSpan.Process.ServiceName,
Child: s.Process.ServiceName,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}
}

func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span {
traceState, ok := traces[span.TraceID]
if !ok {
return nil
}
return traceState.spanMap[span.ParentSpanID()]
}

// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin
func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink {
retMe := make([]*model.DependencyLink, 0, len(deps))
for _, dep := range deps {
retMe = append(retMe, dep)
}
return retMe
}
15 changes: 15 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dependencyprocessor

import "time"

type Config struct {
AggregationInterval time.Duration `yaml:"aggregation_interval"`
InactivityTimeout time.Duration `yaml:"inactivity_timeout"`
}

func DefaultConfig() Config {
return Config{
AggregationInterval: 5 * time.Second, // 默认每5秒聚合一次依赖
InactivityTimeout: 2 * time.Second, // 默认trace不活跃2秒后视为完成
}
}
61 changes: 61 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type dependencyProcessor struct {
config *Config
aggregator *dependencyAggregator // Define the aggregator below.
telset component.TelemetrySettings
dependencyWriter *memory.Store
closeChan chan struct{}
}

func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyProcessor {
return &dependencyProcessor{
config: &cfg,
telset: telset,
dependencyWriter: dependencyWriter,
closeChan: make(chan struct{}),
}
}

func (tp *dependencyProcessor) start(_ context.Context, host component.Host) error {
tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter)
tp.aggregator.Start(tp.closeChan)
return nil
}

func (tp *dependencyProcessor) close(ctx context.Context) error {
close(tp.closeChan)
if tp.aggregator != nil {
if err := tp.aggregator.Close(); err != nil {
return fmt.Errorf("failed to stop the dependency aggregator : %w", err)
}
}
return nil
}

func (tp *dependencyProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
batches := v1adapter.ProtoFromTraces(td)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
span.Process = batch.Process
}
tp.aggregator.HandleSpan(span)
}
}
return td, nil
}
72 changes: 55 additions & 17 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ type Store struct {
sync.RWMutex
// Each tenant gets a copy of default config.
// In the future this can be extended to contain per-tenant configuration.
defaultConfig Configuration
perTenant map[string]*Tenant
defaultConfig Configuration
perTenant map[string]*Tenant
useNewDependencies bool
}

// Tenant is an in-memory store of traces for a single tenant
type Tenant struct {
sync.RWMutex
ids []*model.TraceID
traces map[model.TraceID]*model.Trace
services map[string]struct{}
operations map[string]map[spanstore.Operation]struct{}
deduper adjuster.Adjuster
config Configuration
index int
ids []*model.TraceID
traces map[model.TraceID]*model.Trace
services map[string]struct{}
operations map[string]map[spanstore.Operation]struct{}
deduper adjuster.Adjuster
config Configuration
index int
dependencyLinks map[string]*model.DependencyLink
}

// NewStore creates an unbounded in-memory store
Expand All @@ -48,19 +50,22 @@ func NewStore() *Store {
// WithConfiguration creates a new in memory storage based on the given configuration
func WithConfiguration(cfg Configuration) *Store {
return &Store{
defaultConfig: cfg,
perTenant: make(map[string]*Tenant),
defaultConfig: cfg,
perTenant: make(map[string]*Tenant),
useNewDependencies: false, // 添加初始化
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use English in comments

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


}
}

func newTenant(cfg Configuration) *Tenant {
return &Tenant{
ids: make([]*model.TraceID, cfg.MaxTraces),
traces: map[model.TraceID]*model.Trace{},
services: map[string]struct{}{},
operations: map[string]map[spanstore.Operation]struct{}{},
deduper: adjuster.ZipkinSpanIDUniquifier(),
config: cfg,
ids: make([]*model.TraceID, cfg.MaxTraces),
traces: map[model.TraceID]*model.Trace{},
services: map[string]struct{}{},
operations: map[string]map[spanstore.Operation]struct{}{},
deduper: adjuster.ZipkinSpanIDUniquifier(),
config: cfg,
dependencyLinks: make(map[string]*model.DependencyLink),
}
}

Expand All @@ -83,6 +88,9 @@ func (st *Store) getTenant(tenantID string) *Tenant {

// GetDependencies returns dependencies between services
func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
if st.useNewDependencies { // 添加条件判断
return st.getDependenciesNew(ctx)
}
m := st.getTenant(tenancy.GetTenant(ctx))
// deduper used below can modify the spans, so we take an exclusive lock
m.Lock()
Expand Down Expand Up @@ -119,6 +127,36 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback
return retMe, nil
}

func (st *Store) getDependenciesNew(ctx context.Context) ([]model.DependencyLink, error) {
m := st.getTenant(tenancy.GetTenant(ctx))
m.RLock()
defer m.RUnlock()
retMe := make([]model.DependencyLink, 0, len(m.dependencyLinks))
for _, dep := range m.dependencyLinks {
retMe = append(retMe, *dep)
}
return retMe, nil
}

func (st *Store) WriteDependencies(ctx context.Context, ts time.Time, dependencies []*model.DependencyLink) error {
m := st.getTenant(tenancy.GetTenant(ctx))
m.Lock()
defer m.Unlock()
for _, dep := range dependencies {
key := dep.Parent + "&&&" + dep.Child
if _, ok := m.dependencyLinks[key]; !ok {
m.dependencyLinks[key] = &model.DependencyLink{
Parent: dep.Parent,
Child: dep.Child,
CallCount: dep.CallCount,
}
} else {
m.dependencyLinks[key].CallCount += dep.CallCount
}
}
return nil
}

func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
for _, s := range trace.Spans {
if s.SpanID == spanID {
Expand Down