Skip to content

Commit

Permalink
feat(dataobj-querier): Fixes streams section sharding & instrumentati…
Browse files Browse the repository at this point in the history
…on (#16349)
  • Loading branch information
cyriltovena authored Feb 18, 2025
1 parent 412f733 commit 543ad8e
Show file tree
Hide file tree
Showing 8 changed files with 738 additions and 81 deletions.
5 changes: 3 additions & 2 deletions pkg/dataobj/querier/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"container/heap"
"context"
"fmt"
"io"
"sort"
"sync"
Expand Down Expand Up @@ -74,7 +75,7 @@ func newEntryIterator(ctx context.Context,
for {
n, err := reader.Read(ctx, buf)
if err != nil && err != io.EOF {
return nil, err
return nil, fmt.Errorf("failed to read log records: %w", err)
}

if n == 0 && err == io.EOF {
Expand Down Expand Up @@ -295,7 +296,7 @@ func newSampleIterator(ctx context.Context,
for {
n, err := reader.Read(ctx, buf)
if err != nil && err != io.EOF {
return nil, err
return nil, fmt.Errorf("failed to read log records: %w", err)
}

// Handle end of stream or empty read
Expand Down
80 changes: 58 additions & 22 deletions pkg/dataobj/querier/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

var streamsPool = sync.Pool{
Expand All @@ -29,11 +31,17 @@ var streamsPool = sync.Pool{

// SelectSeries implements querier.Store
func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
objects, err := s.objectsForTimeRange(ctx, req.Start, req.End)
logger := util_log.WithContext(ctx, s.logger)

objects, err := s.objectsForTimeRange(ctx, req.Start, req.End, logger)
if err != nil {
return nil, err
}

if len(objects) == 0 {
return nil, nil
}

shard, err := parseShards(req.Shards)
if err != nil {
return nil, err
Expand All @@ -50,7 +58,7 @@ func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]

uniqueSeries := &sync.Map{}

processor := newStreamProcessor(req.Start, req.End, matchers, objects, shard, s.logger)
processor := newStreamProcessor(req.Start, req.End, matchers, objects, shard, logger)

err = processor.ProcessParallel(ctx, func(h uint64, stream dataobj.Stream) {
uniqueSeries.Store(h, labelsToSeriesIdentifier(stream.Labels))
Expand All @@ -73,13 +81,18 @@ func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]

// LabelNamesForMetricName implements querier.Store
func (s *Store) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, matchers ...*labels.Matcher) ([]string, error) {
logger := util_log.WithContext(ctx, s.logger)
start, end := from.Time(), through.Time()
objects, err := s.objectsForTimeRange(ctx, start, end)
objects, err := s.objectsForTimeRange(ctx, start, end, logger)
if err != nil {
return nil, err
}

processor := newStreamProcessor(start, end, matchers, objects, noShard, s.logger)
if len(objects) == 0 {
return nil, nil
}

processor := newStreamProcessor(start, end, matchers, objects, noShard, logger)
uniqueNames := sync.Map{}

err = processor.ProcessParallel(ctx, func(_ uint64, stream dataobj.Stream) {
Expand All @@ -104,6 +117,7 @@ func (s *Store) LabelNamesForMetricName(ctx context.Context, _ string, from, thr

// LabelValuesForMetricName implements querier.Store
func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
logger := util_log.WithContext(ctx, s.logger)
start, end := from.Time(), through.Time()

requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, labelName, "")
Expand All @@ -113,12 +127,16 @@ func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, th

matchers = append(matchers, requireLabel)

objects, err := s.objectsForTimeRange(ctx, start, end)
objects, err := s.objectsForTimeRange(ctx, start, end, logger)
if err != nil {
return nil, err
}

processor := newStreamProcessor(start, end, matchers, objects, noShard, s.logger)
if len(objects) == 0 {
return nil, nil
}

processor := newStreamProcessor(start, end, matchers, objects, noShard, logger)
uniqueValues := sync.Map{}

err = processor.ProcessParallel(ctx, func(_ uint64, stream dataobj.Stream) {
Expand All @@ -143,13 +161,13 @@ func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, th
type streamProcessor struct {
predicate dataobj.StreamsPredicate
seenSeries *sync.Map
objects []*dataobj.Object
objects []object
shard logql.Shard
logger log.Logger
}

// newStreamProcessor creates a new streamProcessor with the given parameters
func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []*dataobj.Object, shard logql.Shard, logger log.Logger) *streamProcessor {
func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []object, shard logql.Shard, logger log.Logger) *streamProcessor {
return &streamProcessor{
predicate: streamPredicate(matchers, start, end),
seenSeries: &sync.Map{},
Expand All @@ -172,6 +190,10 @@ func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func
}()

start := time.Now()
span := opentracing.SpanFromContext(ctx)
if span != nil {
span.LogKV("msg", "processing streams", "total_readers", len(readers))
}
level.Debug(sp.logger).Log("msg", "processing streams", "total_readers", len(readers))

// set predicate on all readers
Expand All @@ -185,6 +207,8 @@ func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func
var processedStreams atomic.Int64
for _, reader := range readers {
g.Go(func() error {
span, ctx := opentracing.StartSpanFromContext(ctx, "streamProcessor.processSingleReader")
defer span.Finish()
n, err := sp.processSingleReader(ctx, reader, onNewStream)
if err != nil {
return err
Expand All @@ -203,6 +227,9 @@ func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func
"total_streams_processed", processedStreams.Load(),
"duration", time.Since(start),
)
if span != nil {
span.LogKV("msg", "streamProcessor.ProcessParallel done", "total_readers", len(readers), "total_streams_processed", processedStreams.Load(), "duration", time.Since(start))
}

return nil
}
Expand All @@ -221,7 +248,7 @@ func (sp *streamProcessor) processSingleReader(ctx context.Context, reader *data
for {
n, err := reader.Read(ctx, streams)
if err != nil && err != io.EOF {
return processed, err
return processed, fmt.Errorf("failed to read streams: %w", err)
}
if n == 0 && err == io.EOF {
break
Expand Down Expand Up @@ -253,29 +280,38 @@ func labelsToSeriesIdentifier(labels labels.Labels) logproto.SeriesIdentifier {
}

// shardStreamReaders fetches metadata of objects in parallel and shards them into a list of StreamsReaders
func shardStreamReaders(ctx context.Context, objects []*dataobj.Object, shard logql.Shard) ([]*dataobj.StreamsReader, error) {
func shardStreamReaders(ctx context.Context, objects []object, shard logql.Shard) ([]*dataobj.StreamsReader, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "shardStreamReaders")
defer span.Finish()

span.SetTag("objects", len(objects))

metadatas, err := fetchMetadatas(ctx, objects)
if err != nil {
return nil, err
}

// sectionIndex tracks the global section number across all objects to ensure consistent sharding
var sectionIndex uint64
var readers []*dataobj.StreamsReader
for i, metadata := range metadatas {
for j := 0; j < metadata.StreamsSections; j++ {
// For sharded queries (e.g., "1 of 2"), we only read sections that belong to our shard
// The section is assigned to a shard based on its global index across all objects
if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 {
if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) {
sectionIndex++
continue
}
if metadata.StreamsSections > 1 {
return nil, fmt.Errorf("unsupported multiple streams sections count: %d", metadata.StreamsSections)
}

// For sharded queries (e.g., "1 of 2"), we only read sections that belong to our shard
// The section is assigned to a shard based on its global index across all objects
if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 {
if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) {
sectionIndex++
continue
}
reader := streamReaderPool.Get().(*dataobj.StreamsReader)
reader.Reset(objects[i], j)
readers = append(readers, reader)
sectionIndex++
}
reader := streamReaderPool.Get().(*dataobj.StreamsReader)
reader.Reset(objects[i].Object, 0)
readers = append(readers, reader)
sectionIndex++
}
span.LogKV("msg", "shardStreamReaders done", "readers", len(readers))
return readers, nil
}
Loading

0 comments on commit 543ad8e

Please sign in to comment.