diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index b4fa65344fc7..1d90ccd25c97 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -4,8 +4,9 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" import ( + "bytes" "context" - "encoding/binary" + "encoding/json" "errors" "fmt" @@ -58,6 +59,7 @@ type fileTracker struct { var errInvalidValue = errors.New("invalid value") var archiveIndexKey = "knownFilesArchiveIndex" +var archivePollsToArchiveKey = "knonwFilesPollsToArchive" func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker { knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) @@ -76,7 +78,9 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA persister: persister, archiveIndex: 0, } - t.restoreArchiveIndex() + if t.archiveEnabled() { + t.restoreArchiveIndex(context.Background()) + } return t } @@ -142,7 +146,9 @@ func (t *fileTracker) EndPoll() { // t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2] // Instead of throwing it away, archive it. - t.archive(t.knownFiles[2]) + if t.archiveEnabled() { + t.archive(t.knownFiles[2]) + } copy(t.knownFiles[1:], t.knownFiles) t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles) } @@ -155,24 +161,108 @@ func (t *fileTracker) TotalReaders() int { return total } -func (t *fileTracker) restoreArchiveIndex() { - if !t.archiveEnabled() { - return - } - byteIndex, err := t.persister.Get(context.Background(), archiveIndexKey) +func (t *fileTracker) restoreArchiveIndex(ctx context.Context) { + byteIndex, err := t.persister.Get(ctx, archiveIndexKey) if err != nil { t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err)) + t.archiveIndex = 0 return } - t.archiveIndex, err = byteToIndex(byteIndex) + + previousPollsToArchive, err := t.previousPollsToArchive(ctx) + if err != nil { + // if there's an error reading previousPollsToArchive, default to current value + previousPollsToArchive = t.pollsToArchive + } + + t.archiveIndex, err = decodeIndex(byteIndex) if err != nil { t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err)) - } else if t.archiveIndex < 0 || t.archiveIndex >= t.pollsToArchive { - // safety check. It can happen if `polls_to_archive` was changed. - // It's best if we reset the index or else we might end up writing invalid keys - t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0") + } else if previousPollsToArchive < t.pollsToArchive { + // if archive size has increased, we just point the archive index at the end + t.archiveIndex = previousPollsToArchive + } else if previousPollsToArchive > t.pollsToArchive { + // we will only attempt to rewrite archive if the archive size has shrinked + t.set.Logger.Warn("polls_to_archive has changed. Will attempt to rewrite archive") + t.rewriteArchive(ctx, previousPollsToArchive) + } + + t.removeExtraKeys(ctx) + + // store current pollsToArchive + if err := t.persister.Set(ctx, archivePollsToArchiveKey, encodeIndex(t.pollsToArchive)); err != nil { + t.set.Logger.Error("Error storing polls_to_archive", zap.Error(err)) + } +} + +func (t *fileTracker) rewriteArchive(ctx context.Context, previousPollsToArchive int) { + if t.archiveIndex < 0 { + // safety check. + return + } + swapData := func(idx1, idx2 int) error { + val1, err := t.persister.Get(ctx, archiveKey(idx1)) + if err != nil { + return err + } + val2, err := t.persister.Get(ctx, archiveKey(idx2)) + if err != nil { + return err + } + return t.persister.Batch(ctx, storage.SetOperation(archiveKey(idx1), val2), storage.SetOperation(archiveKey(idx2), val1)) + } + + leastRecentIndex := mod(t.archiveIndex-t.pollsToArchive, previousPollsToArchive) + + // If we need to start from the least recent poll, adjust the direction of the operation + if t.archiveIndex >= leastRecentIndex { + // start from least recent + for i := 0; i < t.pollsToArchive; i++ { + if err := swapData(i, leastRecentIndex); err != nil { + t.set.Logger.Warn("error while swapping archive", zap.Error(err)) + } + leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive + } t.archiveIndex = 0 + } else { + // start from most recent + count := previousPollsToArchive - leastRecentIndex + if val, _ := t.persister.Get(ctx, archiveKey(t.archiveIndex)); val == nil { + // if current index points at unset key, no need to do anything + return + } + for i := t.archiveIndex; i < count; i++ { + if err := swapData(i, leastRecentIndex); err != nil { + t.set.Logger.Warn("error while swapping archive", zap.Error(err)) + } + leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive + } + } +} + +func (t *fileTracker) removeExtraKeys(ctx context.Context) { + for i := t.pollsToArchive; ; i++ { + if val, err := t.persister.Get(ctx, archiveKey(i)); err != nil || val == nil { + break + } + if err := t.persister.Delete(ctx, archiveKey(i)); err != nil { + t.set.Logger.Error("error while cleaning extra keys", zap.Error(err)) + } + } +} + +func (t *fileTracker) previousPollsToArchive(ctx context.Context) (int, error) { + byteIndex, err := t.persister.Get(ctx, archivePollsToArchiveKey) + if err != nil { + t.set.Logger.Error("error while reading the archiveIndexKey", zap.Error(err)) + return 0, err + } + previousPollsToArchive, err := decodeIndex(byteIndex) + if err != nil { + t.set.Logger.Error("error while decoding previousPollsToArchive", zap.Error(err)) + return 0, err } + return previousPollsToArchive, nil } func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { @@ -193,12 +283,9 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // start // index - if !t.archiveEnabled() { - return - } index := t.archiveIndex - t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index - indexOp := storage.SetOperation(archiveIndexKey, intToByte(t.archiveIndex)) // batch the updated index with metadata + t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index + indexOp := storage.SetOperation(archiveIndexKey, encodeIndex(t.archiveIndex)) // batch the updated index with metadata if err := t.writeArchive(index, metadata, indexOp); err != nil { t.set.Logger.Error("error faced while saving to the archive", zap.Error(err)) } @@ -206,8 +293,7 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // readArchive loads data from the archive for a given index and returns a fileset.Filset. func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) { - key := fmt.Sprintf("knownFiles%d", index) - metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key) + metadata, err := checkpoint.LoadKey(context.Background(), t.persister, archiveKey(index)) if err != nil { return nil, err } @@ -218,8 +304,7 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata] // writeArchive saves data to the archive for a given index and returns an error, if encountered. func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...storage.Operation) error { - key := fmt.Sprintf("knownFiles%d", index) - return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key, ops...) + return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), archiveKey(index), ops...) } func (t *fileTracker) archiveEnabled() bool { @@ -333,17 +418,30 @@ func (t *noStateTracker) TotalReaders() int { return 0 } func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil } -func intToByte(val int) []byte { - return binary.LittleEndian.AppendUint64([]byte{}, uint64(val)) -} +func encodeIndex(val int) []byte { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) -func byteToIndex(buf []byte) (int, error) { - if buf == nil { - return 0, nil + // Encode the index + if err := enc.Encode(val); err != nil { + return buf.Bytes() } - // The sizeof uint64 in binary is 8. - if len(buf) < 8 { - return 0, errInvalidValue - } - return int(binary.LittleEndian.Uint64(buf)), nil + return nil +} + +func decodeIndex(buf []byte) (int, error) { + var index int + + // Decode the index + dec := json.NewDecoder(bytes.NewReader(buf)) + err := dec.Decode(&index) + return max(index, 0), err +} + +func archiveKey(i int) string { + return fmt.Sprintf("knownFiles%d", i) +} + +func mod(x, y int) int { + return (x + y) % y }