Skip to content

Commit

Permalink
Merge branch 'main' into v2-test
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Jan 18, 2025
2 parents 9e0d66e + aa7cf7d commit f918ce0
Show file tree
Hide file tree
Showing 67 changed files with 226 additions and 152 deletions.
2 changes: 1 addition & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import (
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
ss "github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/metafactory"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/metricstore"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
Expand Down
3 changes: 1 addition & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) {
}

func (sp *spanProcessor) writeSpan(ctx context.Context, span *model.Span) error {
spanWriter, err := v1adapter.GetV1Writer(sp.traceWriter)
if err == nil {
if spanWriter, ok := v1adapter.GetV1Writer(sp.traceWriter); ok {
return spanWriter.WriteSpan(ctx, span)
}
traces := v1adapter.V1BatchesToTraces([]*model.Batch{{Spans: []*model.Span{span}}})
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
cmdFlags "github.com/jaegertracing/jaeger/cmd/internal/flags"
"github.com/jaegertracing/jaeger/cmd/internal/printconfig"
"github.com/jaegertracing/jaeger/cmd/internal/status"
ss "github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/metafactory"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
Expand Down
8 changes: 4 additions & 4 deletions cmd/internal/env/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"

ss "github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/metafactory"
"github.com/jaegertracing/jaeger/plugin/metricstore"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
"github.com/jaegertracing/jaeger/plugin/storage"
)

Expand Down Expand Up @@ -66,11 +66,11 @@ func Command() *cobra.Command {
"The type of backend used for service dependencies storage.",
)
fs.String(
strategyprovider.SamplingTypeEnvVar,
ss.SamplingTypeEnvVar,
"file",
fmt.Sprintf(
strings.ReplaceAll(samplingTypeDescription, "\n", " "),
strings.Join(strategyprovider.AllSamplingTypes, ", "),
strings.Join(ss.AllSamplingTypes, ", "),
),
)
fs.String(
Expand All @@ -79,7 +79,7 @@ func Command() *cobra.Command {
fmt.Sprintf(
strings.ReplaceAll(samplingStorageTypeDescription, "\n", " "),
strings.Join(storage.AllSamplingStorageTypes(), ", "),
strategyprovider.SamplingTypeEnvVar,
ss.SamplingTypeEnvVar,
),
)
fs.String(
Expand Down
58 changes: 46 additions & 12 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensioncapabilities"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
Expand All @@ -22,7 +23,9 @@ import (
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metricstore/disabled"
"github.com/jaegertracing/jaeger/storage/metricstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

Expand Down Expand Up @@ -94,7 +97,6 @@ func (s *server) Start(ctx context.Context, host component.Host) error {

var opts querysvc.QueryServiceOptions
var v2opts v2querysvc.QueryServiceOptions
// TODO archive storage still uses v1 factory
if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil {
return err
}
Expand Down Expand Up @@ -140,26 +142,58 @@ func (s *server) addArchiveStorage(
return nil
}

f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesArchive, host)
f, err := jaegerstorage.GetTraceStoreFactory(s.config.Storage.TracesArchive, host)
if err != nil {
return fmt.Errorf("cannot find archive storage factory: %w", err)
return fmt.Errorf("cannot find traces archive storage factory: %w", err)
}

if !opts.InitArchiveStorage(f, s.telset.Logger) {
s.telset.Logger.Info("Archive storage not initialized")
traceReader, traceWriter := s.initArchiveStorage(f)
if traceReader == nil || traceWriter == nil {
return nil
}

ar, aw := v1adapter.InitializeArchiveStorage(f, s.telset.Logger)
if ar != nil && aw != nil {
v2opts.ArchiveTraceReader = ar
v2opts.ArchiveTraceWriter = aw
} else {
s.telset.Logger.Info("Archive storage not initialized")
}
v2opts.ArchiveTraceReader = traceReader
v2opts.ArchiveTraceWriter = traceWriter

spanReader, spanWriter := getV1Adapters(traceReader, traceWriter)

opts.ArchiveSpanReader = spanReader
opts.ArchiveSpanWriter = spanWriter

return nil
}

func (s *server) initArchiveStorage(f tracestore.Factory) (tracestore.Reader, tracestore.Writer) {
reader, err := f.CreateTraceReader()
if err != nil {
s.telset.Logger.Error("Cannot init traces archive storage reader", zap.Error(err))
return nil, nil
}
writer, err := f.CreateTraceWriter()
if err != nil {
s.telset.Logger.Error("Cannot init traces archive storage writer", zap.Error(err))
return nil, nil
}
return reader, writer
}

func getV1Adapters(
reader tracestore.Reader,
writer tracestore.Writer,
) (spanstore.Reader, spanstore.Writer) {
v1Reader, ok := v1adapter.GetV1Reader(reader)
if !ok {
v1Reader = v1adapter.NewSpanReader(reader)
}

v1Writer, ok := v1adapter.GetV1Writer(writer)
if !ok {
v1Writer = v1adapter.NewSpanWriter(writer)
}

return v1Reader, v1Writer
}

func (s *server) createMetricReader(host component.Host) (metricstore.Reader, error) {
if s.config.Storage.Metrics == "" {
s.telset.Logger.Info("Metric storage not configured")
Expand Down
72 changes: 54 additions & 18 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
metricstoremocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type fakeFactory struct {
Expand Down Expand Up @@ -63,14 +66,6 @@ func (ff fakeFactory) CreateSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (fakeFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return &spanstoremocks.Reader{}, nil
}

func (fakeFactory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (ff fakeFactory) Initialize(metrics.Factory, *zap.Logger) error {
if ff.name == "need-initialize-error" {
return errors.New("test-error")
Expand Down Expand Up @@ -104,11 +99,6 @@ var _ jaegerstorage.Extension = (*fakeStorageExt)(nil)
func (fakeStorageExt) TraceStorageFactory(name string) (storage.Factory, bool) {
if name == "need-factory-error" {
return nil, false
} else if name == "no-archive" {
f := fakeFactory{name: name}
return struct {
storage.Factory
}{f}, true
}

return fakeFactory{name: name}, true
Expand Down Expand Up @@ -193,7 +183,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "jaeger_storage",
},
},
expectedErr: "cannot find archive storage factory",
expectedErr: "cannot find traces archive storage factory",
},
{
name: "metrics storage error",
Expand Down Expand Up @@ -296,19 +286,32 @@ func TestServerAddArchiveStorage(t *testing.T) {
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
expectedOutput: "",
expectedErr: "cannot find archive storage factory: cannot find extension",
expectedErr: "cannot find traces archive storage factory: cannot find extension",
},
{
name: "Archive storage not supported",
name: "Error in trace reader",
config: &Config{
Storage: Storage{
TracesArchive: "no-archive",
TracesArchive: "need-span-reader-error",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Archive storage not supported by the factory",
expectedOutput: "Cannot init traces archive storage reader",
expectedErr: "",
},
{
name: "Error in trace writer",
config: &Config{
Storage: Storage{
TracesArchive: "need-span-writer-error",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Cannot init traces archive storage writer",
expectedErr: "",
},
{
Expand Down Expand Up @@ -350,6 +353,39 @@ func TestServerAddArchiveStorage(t *testing.T) {
}
}

func TestGetV1Adapters(t *testing.T) {
tests := []struct {
name string
reader tracestore.Reader
writer tracestore.Writer
expectedReader spanstore.Reader
expectedWriter spanstore.Writer
}{
{
name: "native tracestore.Reader and tracestore.Writer",
reader: &tracestoremocks.Reader{},
writer: &tracestoremocks.Writer{},
expectedReader: v1adapter.NewSpanReader(&tracestoremocks.Reader{}),
expectedWriter: v1adapter.NewSpanWriter(&tracestoremocks.Writer{}),
},
{
name: "wrapped spanstore.Reader and spanstore.Writer",
reader: v1adapter.NewTraceReader(&spanstoremocks.Reader{}),
writer: v1adapter.NewTraceWriter(&spanstoremocks.Writer{}),
expectedReader: &spanstoremocks.Reader{},
expectedWriter: &spanstoremocks.Writer{},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotReader, gotWriter := getV1Adapters(test.reader, test.writer)
require.Equal(t, test.expectedReader, gotReader)
require.Equal(t, test.expectedWriter, gotWriter)
})
}
}

func TestServerAddMetricsStorage(t *testing.T) {
host := componenttest.NewNopHost()

Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/remotesampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/featuregate"

"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/adaptive"
)

var (
Expand Down
8 changes: 4 additions & 4 deletions cmd/jaeger/internal/extension/remotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
samplinggrpc "github.com/jaegertracing/jaeger/internal/sampling/grpc"
samplinghttp "github.com/jaegertracing/jaeger/internal/sampling/http"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/adaptive"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/file"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/samplingstore"
Expand Down Expand Up @@ -164,7 +164,7 @@ func (ext *rsExtension) Shutdown(ctx context.Context) error {
}

func (ext *rsExtension) startFileBasedStrategyProvider(_ context.Context) error {
opts := static.Options{
opts := file.Options{
StrategiesFile: ext.cfg.File.Path,
ReloadInterval: ext.cfg.File.ReloadInterval,
IncludeDefaultOpStrategies: includeDefaultOpStrategies.IsEnabled(),
Expand All @@ -173,7 +173,7 @@ func (ext *rsExtension) startFileBasedStrategyProvider(_ context.Context) error

// contextcheck linter complains about next line that context is not passed.
//nolint
provider, err := static.NewProvider(opts, ext.telemetry.Logger)
provider, err := file.NewProvider(opts, ext.telemetry.Logger)
if err != nil {
return fmt.Errorf("failed to create the local file strategy store: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/adaptive"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
Expand Down
6 changes: 3 additions & 3 deletions cmd/jaeger/internal/extension/remotesampling/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/extension"

"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/adaptive"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/file"
"github.com/jaegertracing/jaeger/ports"
)

Expand Down Expand Up @@ -45,7 +45,7 @@ func createDefaultConfig() component.Config {
},
File: &FileConfig{
Path: "", // path needs to be specified
DefaultSamplingProbability: static.DefaultSamplingProbability,
DefaultSamplingProbability: file.DefaultSamplingProbability,
},
Adaptive: &AdaptiveConfig{
SamplingStore: "", // storage name needs to be specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling"
"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/adaptive"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/adaptive"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type TraceQueryParameters struct {

// NewQueryService returns a new QueryService.
func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService {
spanReader, err := v1adapter.GetV1Reader(traceReader)
if err != nil {
spanReader, ok := v1adapter.GetV1Reader(traceReader)
if !ok {
// if the spanstore.Reader is not available, downgrade the native tracestore.Reader to
// a spanstore.Reader
spanReader = v1adapter.NewSpanReader(traceReader)
Expand Down
Loading

0 comments on commit f918ce0

Please sign in to comment.