diff --git a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go index 8a5a60b7d734..382ad045951d 100644 --- a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go +++ b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go @@ -10,6 +10,8 @@ import ( "errors" "fmt" + "go.opentelemetry.io/collector/extension/experimental/storage" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) @@ -21,7 +23,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta return SaveKey(ctx, persister, rmds, knownFilesKey) } -func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error { +func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...storage.Operation) error { var buf bytes.Buffer enc := json.NewEncoder(&buf) @@ -37,8 +39,8 @@ func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.M errs = append(errs, fmt.Errorf("encode metadata: %w", err)) } } - - if err := persister.Set(ctx, key, buf.Bytes()); err != nil { + ops = append(ops, storage.SetOperation(key, buf.Bytes())) + if err := persister.Batch(ctx, ops...); err != nil { errs = append(errs, fmt.Errorf("persist known files: %w", err)) } diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index c784d1c3485a..c8a3b702c3d2 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -4,10 +4,14 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" import ( + "bytes" "context" + "encoding/json" + "errors" "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension/experimental/storage" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" @@ -52,13 +56,19 @@ type fileTracker struct { archiveIndex int } +var errInvalidValue = errors.New("invalid value") + +var archiveIndexKey = "knownFilesArchiveIndex" +var archivePollsToArchiveKey = "knonwFilesPollsToArchive" + func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker { knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) for i := 0; i < len(knownFiles); i++ { knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles) } set.Logger = set.Logger.With(zap.String("tracker", "fileTracker")) - return &fileTracker{ + + t := &fileTracker{ set: set, maxBatchFiles: maxBatchFiles, currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), @@ -68,6 +78,11 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA persister: persister, archiveIndex: 0, } + if t.archiveEnabled() { + t.restoreArchiveIndex(context.Background()) + } + + return t } func (t *fileTracker) Add(reader *reader.Reader) { @@ -131,7 +146,9 @@ func (t *fileTracker) EndPoll() { // t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2] // Instead of throwing it away, archive it. - t.archive(t.knownFiles[2]) + if t.archiveEnabled() { + t.archive(t.knownFiles[2]) + } copy(t.knownFiles[1:], t.knownFiles) t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles) } @@ -144,6 +161,109 @@ func (t *fileTracker) TotalReaders() int { return total } +func (t *fileTracker) restoreArchiveIndex(ctx context.Context) { + byteIndex, err := t.persister.Get(ctx, archiveIndexKey) + if err != nil { + t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err)) + t.archiveIndex = 0 + return + } + + previousPollsToArchive, err := t.previousPollsToArchive(ctx) + if err != nil { + // if there's an error reading previousPollsToArchive, default to current value + previousPollsToArchive = t.pollsToArchive + } + + t.archiveIndex, err = decodeIndex(byteIndex) + if err != nil { + t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err)) + } else if previousPollsToArchive < t.pollsToArchive { + // if archive size has increased, we just increment the index until we enconter a nil value + for t.archiveIndex < t.pollsToArchive && t.isSet(ctx, t.archiveIndex) { + t.archiveIndex++ + } + } else if previousPollsToArchive > t.pollsToArchive { + // we will only attempt to rewrite archive if the archive size has shrunk + t.set.Logger.Warn("polls_to_archive has changed. Will attempt to rewrite archive") + t.rewriteArchive(ctx, previousPollsToArchive) + } + + t.removeExtraKeys(ctx) + + // store current pollsToArchive + if err := t.persister.Set(ctx, archivePollsToArchiveKey, encodeIndex(t.pollsToArchive)); err != nil { + t.set.Logger.Error("Error storing polls_to_archive", zap.Error(err)) + } +} + +func (t *fileTracker) rewriteArchive(ctx context.Context, previousPollsToArchive int) { + // Ensure archiveIndex is non-negative + if t.archiveIndex < 0 { + t.archiveIndex = 0 + return + } + // Function to swap data between two indices + swapData := func(idx1, idx2 int) error { + val1, err := t.persister.Get(ctx, archiveKey(idx1)) + if err != nil { + return err + } + val2, err := t.persister.Get(ctx, archiveKey(idx2)) + if err != nil { + return err + } + return t.persister.Batch(ctx, storage.SetOperation(archiveKey(idx1), val2), storage.SetOperation(archiveKey(idx2), val1)) + } + // Calculate the least recent index, w.r.t. new archive size + + leastRecentIndex := mod(t.archiveIndex-t.pollsToArchive, previousPollsToArchive) + // Refer archive.md for the detailed design + + if mod(t.archiveIndex-1, previousPollsToArchive) > t.pollsToArchive { + for i := 0; i < t.pollsToArchive; i++ { + if err := swapData(i, leastRecentIndex); err != nil { + t.set.Logger.Error("error while swapping archive", zap.Error(err)) + } + leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive + } + t.archiveIndex = 0 + } else { + if t.isSet(ctx, t.archiveIndex) { + // If the current index points at an unset key, no need to do anything + return + } + for i := 0; i < t.pollsToArchive-t.archiveIndex; i++ { + if err := swapData(t.archiveIndex+i, leastRecentIndex); err != nil { + t.set.Logger.Warn("error while swapping archive", zap.Error(err)) + } + leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive + } + } +} + +func (t *fileTracker) removeExtraKeys(ctx context.Context) { + for i := t.pollsToArchive; t.isSet(ctx, i); i++ { + if err := t.persister.Delete(ctx, archiveKey(i)); err != nil { + t.set.Logger.Error("error while cleaning extra keys", zap.Error(err)) + } + } +} + +func (t *fileTracker) previousPollsToArchive(ctx context.Context) (int, error) { + byteIndex, err := t.persister.Get(ctx, archivePollsToArchiveKey) + if err != nil { + t.set.Logger.Error("error while reading the archiveIndexKey", zap.Error(err)) + return 0, err + } + previousPollsToArchive, err := decodeIndex(byteIndex) + if err != nil { + t.set.Logger.Error("error while decoding previousPollsToArchive", zap.Error(err)) + return 0, err + } + return previousPollsToArchive, nil +} + func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // We make use of a ring buffer, where each set of files is stored under a specific index. // Instead of discarding knownFiles[2], write it to the next index and eventually roll over. @@ -162,19 +282,17 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // start // index - if t.pollsToArchive <= 0 || t.persister == nil { - return - } - if err := t.writeArchive(t.archiveIndex, metadata); err != nil { + index := t.archiveIndex + t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index + indexOp := storage.SetOperation(archiveIndexKey, encodeIndex(t.archiveIndex)) // batch the updated index with metadata + if err := t.writeArchive(index, metadata, indexOp); err != nil { t.set.Logger.Error("error faced while saving to the archive", zap.Error(err)) } - t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index } // readArchive loads data from the archive for a given index and returns a fileset.Filset. func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) { - key := fmt.Sprintf("knownFiles%d", index) - metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key) + metadata, err := checkpoint.LoadKey(context.Background(), t.persister, archiveKey(index)) if err != nil { return nil, err } @@ -184,9 +302,17 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata] } // writeArchive saves data to the archive for a given index and returns an error, if encountered. -func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error { - key := fmt.Sprintf("knownFiles%d", index) - return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key) +func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...storage.Operation) error { + return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), archiveKey(index), ops...) +} + +func (t *fileTracker) archiveEnabled() bool { + return t.pollsToArchive > 0 && t.persister != nil +} + +func (t *fileTracker) isSet(ctx context.Context, index int) bool { + val, err := t.persister.Get(ctx, archiveKey(index)) + return val != nil && err == nil } // FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set. @@ -295,3 +421,31 @@ func (t *noStateTracker) EndPoll() {} func (t *noStateTracker) TotalReaders() int { return 0 } func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil } + +func encodeIndex(val int) []byte { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + + // Encode the index + if err := enc.Encode(val); err != nil { + return nil + } + return buf.Bytes() +} + +func decodeIndex(buf []byte) (int, error) { + var index int + + // Decode the index + dec := json.NewDecoder(bytes.NewReader(buf)) + err := dec.Decode(&index) + return max(index, 0), err +} + +func archiveKey(i int) string { + return fmt.Sprintf("knownFiles%d", i) +} + +func mod(x, y int) int { + return (x + y) % y +} diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go b/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go index f16e2d647032..9d130881449b 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -44,6 +45,36 @@ func TestFindFilesOrder(t *testing.T) { } } +func TestIndexInBounds(t *testing.T) { + persister := testutil.NewUnscopedMockPersister() + pollsToArchive := 100 + tracker := NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker) + + // no index exists. archiveIndex should be 0 + require.Equal(t, 0, tracker.archiveIndex) + + // run archiving. Each time, index should be in bound. + for i := 0; i < 1099; i++ { + require.Equalf(t, i%pollsToArchive, tracker.archiveIndex, "Index should %d, but was %d", i%pollsToArchive, tracker.archiveIndex) + tracker.archive(&fileset.Fileset[*reader.Metadata]{}) + require.Truef(t, tracker.archiveIndex >= 0 && tracker.archiveIndex < pollsToArchive, "Index should be between 0 and %d, but was %d", pollsToArchive, tracker.archiveIndex) + } + oldIndex := tracker.archiveIndex + + // re-create archive + tracker = NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker) + + // index should exist and new archiveIndex should be equal to oldIndex + require.Equalf(t, oldIndex, tracker.archiveIndex, "New index should %d, but was %d", oldIndex, tracker.archiveIndex) + + // re-create archive, with reduced pollsToArchive + pollsToArchive = 70 + tracker = NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker) + + // index should exist but it is out of bounds. So it should reset to 0 + require.Equalf(t, 0, tracker.archiveIndex, "Index should be reset to 0 but was %d", tracker.archiveIndex) +} + func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool { md := make([]*reader.Metadata, 0) diff --git a/pkg/stanza/operator/persister.go b/pkg/stanza/operator/persister.go index 8d0784799eaf..feb912c9e416 100644 --- a/pkg/stanza/operator/persister.go +++ b/pkg/stanza/operator/persister.go @@ -6,6 +6,8 @@ package operator // import "github.com/open-telemetry/opentelemetry-collector-co import ( "context" "fmt" + + "go.opentelemetry.io/collector/extension/experimental/storage" ) // Persister is an interface used to persist data @@ -13,6 +15,7 @@ type Persister interface { Get(context.Context, string) ([]byte, error) Set(context.Context, string, []byte) error Delete(context.Context, string) error + Batch(ctx context.Context, ops ...storage.Operation) error } type scopedPersister struct { @@ -38,3 +41,10 @@ func (p scopedPersister) Set(ctx context.Context, key string, value []byte) erro func (p scopedPersister) Delete(ctx context.Context, key string) error { return p.Persister.Delete(ctx, fmt.Sprintf("%s.%s", p.scope, key)) } + +func (p scopedPersister) Batch(ctx context.Context, ops ...storage.Operation) error { + for _, op := range ops { + op.Key = fmt.Sprintf("%s.%s", p.scope, op.Key) + } + return p.Persister.Batch(ctx, ops...) +} diff --git a/pkg/stanza/testutil/util.go b/pkg/stanza/testutil/util.go index b7ee946d8f9e..217dae60ae55 100644 --- a/pkg/stanza/testutil/util.go +++ b/pkg/stanza/testutil/util.go @@ -8,6 +8,8 @@ import ( "strings" "sync" + "go.opentelemetry.io/collector/extension/experimental/storage" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) @@ -46,6 +48,24 @@ func (p *mockPersister) Delete(_ context.Context, k string) error { return nil } +func (p *mockPersister) Batch(_ context.Context, ops ...storage.Operation) error { + var err error + for _, op := range ops { + switch op.Type { + case storage.Get: + op.Value, err = p.Get(context.Background(), op.Key) + case storage.Set: + err = p.Set(context.Background(), op.Key, op.Value) + case storage.Delete: + err = p.Delete(context.Background(), op.Key) + } + if err != nil { + return err + } + } + return nil +} + // NewUnscopedMockPersister will return a new persister for testing func NewUnscopedMockPersister() operator.Persister { data := make(map[string][]byte)