Skip to content

Commit

Permalink
chore: archive restoration
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Dec 20, 2024
1 parent c8171a4 commit 5d5f12d
Showing 1 changed file with 131 additions and 33 deletions.
164 changes: 131 additions & 33 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"

Expand Down Expand Up @@ -58,6 +59,7 @@ type fileTracker struct {
var errInvalidValue = errors.New("invalid value")

var archiveIndexKey = "knownFilesArchiveIndex"
var archivePollsToArchiveKey = "knonwFilesPollsToArchive"

func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
Expand All @@ -76,7 +78,9 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA
persister: persister,
archiveIndex: 0,
}
t.restoreArchiveIndex()
if t.archiveEnabled() {
t.restoreArchiveIndex(context.Background())
}

return t
}
Expand Down Expand Up @@ -142,7 +146,9 @@ func (t *fileTracker) EndPoll() {
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]

// Instead of throwing it away, archive it.
t.archive(t.knownFiles[2])
if t.archiveEnabled() {
t.archive(t.knownFiles[2])
}
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles)
}
Expand All @@ -155,24 +161,108 @@ func (t *fileTracker) TotalReaders() int {
return total
}

func (t *fileTracker) restoreArchiveIndex() {
if !t.archiveEnabled() {
return
}
byteIndex, err := t.persister.Get(context.Background(), archiveIndexKey)
func (t *fileTracker) restoreArchiveIndex(ctx context.Context) {
byteIndex, err := t.persister.Get(ctx, archiveIndexKey)
if err != nil {
t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err))
t.archiveIndex = 0
return
}
t.archiveIndex, err = byteToIndex(byteIndex)

previousPollsToArchive, err := t.previousPollsToArchive(ctx)
if err != nil {
// if there's an error reading previousPollsToArchive, default to current value
previousPollsToArchive = t.pollsToArchive
}

t.archiveIndex, err = decodeIndex(byteIndex)
if err != nil {
t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err))
} else if t.archiveIndex < 0 || t.archiveIndex >= t.pollsToArchive {
// safety check. It can happen if `polls_to_archive` was changed.
// It's best if we reset the index or else we might end up writing invalid keys
t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0")
} else if previousPollsToArchive < t.pollsToArchive {
// if archive size has increased, we just point the archive index at the end
t.archiveIndex = previousPollsToArchive
} else if previousPollsToArchive > t.pollsToArchive {
// we will only attempt to rewrite archive if the archive size has shrinked

Check failure on line 185 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, pkg)

"shrinked" is a misspelling of "shrunk"

Check failure on line 185 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, pkg)

"shrinked" is a misspelling of "shrunk"

Check failure on line 185 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, pkg)

"shrinked" is a misspelling of "shrunk"

Check failure on line 185 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, pkg)

"shrinked" is a misspelling of "shrunk"
t.set.Logger.Warn("polls_to_archive has changed. Will attempt to rewrite archive")
t.rewriteArchive(ctx, previousPollsToArchive)
}

t.removeExtraKeys(ctx)

// store current pollsToArchive
if err := t.persister.Set(ctx, archivePollsToArchiveKey, encodeIndex(t.pollsToArchive)); err != nil {
t.set.Logger.Error("Error storing polls_to_archive", zap.Error(err))
}
}

func (t *fileTracker) rewriteArchive(ctx context.Context, previousPollsToArchive int) {
if t.archiveIndex < 0 {
// safety check.
return
}
swapData := func(idx1, idx2 int) error {
val1, err := t.persister.Get(ctx, archiveKey(idx1))
if err != nil {
return err
}
val2, err := t.persister.Get(ctx, archiveKey(idx2))
if err != nil {
return err
}
return t.persister.Batch(ctx, storage.SetOperation(archiveKey(idx1), val2), storage.SetOperation(archiveKey(idx2), val1))
}

leastRecentIndex := mod(t.archiveIndex-t.pollsToArchive, previousPollsToArchive)

// If we need to start from the least recent poll, adjust the direction of the operation
if t.archiveIndex >= leastRecentIndex {
// start from least recent
for i := 0; i < t.pollsToArchive; i++ {
if err := swapData(i, leastRecentIndex); err != nil {
t.set.Logger.Warn("error while swapping archive", zap.Error(err))
}
leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive
}
t.archiveIndex = 0
} else {
// start from most recent
count := previousPollsToArchive - leastRecentIndex
if val, _ := t.persister.Get(ctx, archiveKey(t.archiveIndex)); val == nil {
// if current index points at unset key, no need to do anything
return
}
for i := t.archiveIndex; i < count; i++ {
if err := swapData(i, leastRecentIndex); err != nil {
t.set.Logger.Warn("error while swapping archive", zap.Error(err))
}
leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive
}
}
}

func (t *fileTracker) removeExtraKeys(ctx context.Context) {
for i := t.pollsToArchive; ; i++ {
if val, err := t.persister.Get(ctx, archiveKey(i)); err != nil || val == nil {
break
}
if err := t.persister.Delete(ctx, archiveKey(i)); err != nil {
t.set.Logger.Error("error while cleaning extra keys", zap.Error(err))
}
}
}

func (t *fileTracker) previousPollsToArchive(ctx context.Context) (int, error) {
byteIndex, err := t.persister.Get(ctx, archivePollsToArchiveKey)
if err != nil {
t.set.Logger.Error("error while reading the archiveIndexKey", zap.Error(err))
return 0, err
}
previousPollsToArchive, err := decodeIndex(byteIndex)
if err != nil {
t.set.Logger.Error("error while decoding previousPollsToArchive", zap.Error(err))
return 0, err
}
return previousPollsToArchive, nil
}

func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
Expand All @@ -193,21 +283,17 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
// start
// index

if !t.archiveEnabled() {
return
}
index := t.archiveIndex
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
indexOp := storage.SetOperation(archiveIndexKey, intToByte(t.archiveIndex)) // batch the updated index with metadata
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
indexOp := storage.SetOperation(archiveIndexKey, encodeIndex(t.archiveIndex)) // batch the updated index with metadata
if err := t.writeArchive(index, metadata, indexOp); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
}

// readArchive loads data from the archive for a given index and returns a fileset.Filset.
func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) {
key := fmt.Sprintf("knownFiles%d", index)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, archiveKey(index))
if err != nil {
return nil, err
}
Expand All @@ -218,8 +304,7 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata]

// writeArchive saves data to the archive for a given index and returns an error, if encountered.
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...storage.Operation) error {
key := fmt.Sprintf("knownFiles%d", index)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key, ops...)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), archiveKey(index), ops...)
}

func (t *fileTracker) archiveEnabled() bool {
Expand Down Expand Up @@ -333,17 +418,30 @@ func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }

func intToByte(val int) []byte {
return binary.LittleEndian.AppendUint64([]byte{}, uint64(val))
}
func encodeIndex(val int) []byte {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

func byteToIndex(buf []byte) (int, error) {
if buf == nil {
return 0, nil
// Encode the index
if err := enc.Encode(val); err != nil {
return buf.Bytes()
}
// The sizeof uint64 in binary is 8.
if len(buf) < 8 {
return 0, errInvalidValue
}
return int(binary.LittleEndian.Uint64(buf)), nil
return nil
}

func decodeIndex(buf []byte) (int, error) {
var index int

// Decode the index
dec := json.NewDecoder(bytes.NewReader(buf))
err := dec.Decode(&index)
return max(index, 0), err
}

func archiveKey(i int) string {
return fmt.Sprintf("knownFiles%d", i)
}

func mod(x, y int) int {
return (x + y) % y
}

0 comments on commit 5d5f12d

Please sign in to comment.