Skip to content

Commit

Permalink
[chore][fileconsumer/archive] - Add archive read logic (#35798)
Browse files Browse the repository at this point in the history
This PR follows
#35098.

### Description

- This PR adds core logic for matching from archive. Check [this
out](#32727 (comment))
for the core logic.

### Future PRs

- As of now, we don't keep track of most recently written index across
collector restarts. This is simple to accomplish and we can use of
persister for this. I haven't implemented this in current PR, as I want
to guide your focus solely towards reading part. We can address this in
this PR (later) or in a separate PR, independently.
- Testing and Enabling: Once we establish common ground on _**reading
from archive**_ matter, we can proceed with testing and enabling the
configuration.
  • Loading branch information
VihasMakwana authored Dec 9, 2024
1 parent 94d097a commit 1a90009
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 4 deletions.
6 changes: 5 additions & 1 deletion pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.M

// Load loads the most recent set of files to the database
func Load(ctx context.Context, persister operator.Persister) ([]*reader.Metadata, error) {
encoded, err := persister.Get(ctx, knownFilesKey)
return LoadKey(ctx, persister, knownFilesKey)
}

func LoadKey(ctx context.Context, persister operator.Persister, key string) ([]*reader.Metadata, error) {
encoded, err := persister.Get(ctx, key)
if err != nil {
return nil, err
}
Expand Down
74 changes: 72 additions & 2 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Tracker interface {
EndPoll()
EndConsume() int
TotalReaders() int
FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata
}

// fileTracker tracks known offsets for files that are being consumed by the manager.
Expand Down Expand Up @@ -164,13 +165,80 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
if t.pollsToArchive <= 0 || t.persister == nil {
return
}
key := fmt.Sprintf("knownFiles%d", t.archiveIndex)
if err := checkpoint.SaveKey(context.Background(), t.persister, metadata.Get(), key); err != nil {
if err := t.writeArchive(t.archiveIndex, metadata); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// readArchive loads data from the archive for a given index and returns a fileset.Filset.
func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) {
key := fmt.Sprintf("knownFiles%d", index)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key)
if err != nil {
return nil, err
}
f := fileset.New[*reader.Metadata](len(metadata))
f.Add(metadata...)
return f, nil
}

// writeArchive saves data to the archive for a given index and returns an error, if encountered.
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error {
key := fmt.Sprintf("knownFiles%d", index)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key)
}

// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set.
func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata {
// To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found.
// We exit if all fingerprints are matched.

// Track number of matched fingerprints so we can exit if all matched.
var numMatched int

// Determine the index for reading archive, starting from the most recent and moving towards the oldest
nextIndex := t.archiveIndex
matchedMetadata := make([]*reader.Metadata, len(fps))

// continue executing the loop until either all records are matched or all archive sets have been processed.
for i := 0; i < t.pollsToArchive; i++ {
// Update the mostRecentIndex
nextIndex = (nextIndex - 1 + t.pollsToArchive) % t.pollsToArchive

data, err := t.readArchive(nextIndex) // we load one fileset atmost once per poll
if err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
continue
}
archiveModified := false
for j, fp := range fps {
if matchedMetadata[j] != nil {
// we've already found a match for this index, continue
continue
}
if md := data.Match(fp, fileset.StartsWith); md != nil {
// update the matched metada for the index
matchedMetadata[j] = md
archiveModified = true
numMatched++
}
}
if !archiveModified {
continue
}
// we save one fileset atmost once per poll
if err := t.writeArchive(nextIndex, data); err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
}
// Check if all metadata have been found
if numMatched == len(fps) {
return matchedMetadata
}
}
return matchedMetadata
}

// noStateTracker only tracks the current polled files. Once the poll is
// complete and telemetry is consumed, the tracked files are closed. The next
// poll will create fresh readers with no previously tracked offsets.
Expand Down Expand Up @@ -225,3 +293,5 @@ func (t *noStateTracker) ClosePreviousFiles() int { return 0 }
func (t *noStateTracker) EndPoll() {}

func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }
62 changes: 62 additions & 0 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"context"
"math/rand/v2"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func TestFindFilesOrder(t *testing.T) {
fps := make([]*fingerprint.Fingerprint, 0)
for i := 0; i < 100; i++ {
fps = append(fps, fingerprint.New([]byte(uuid.NewString())))
}
persister := testutil.NewUnscopedMockPersister()
fpInStorage := populatedPersisterData(persister, fps)

tracker := NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, 100, persister)
matchables := tracker.FindFiles(fps)

require.Equal(t, len(fps), len(matchables), "return slice should be of same length as input slice")

for i := 0; i < len(matchables); i++ {
if fpInStorage[i] {
// if current fingerprint is present in storage, the corresponding return type should not be nil
require.NotNilf(t, matchables[i], "resulting index %d should be not be nil type", i)
require.Truef(t, fps[i].Equal(matchables[i].GetFingerprint()), "fingerprint at index %d is not equal to corresponding return value", i)
} else {
// if current fingerprint is absent from storage, the corresponding index should be empty i.e. "nil"
require.Nil(t, matchables[i], "resulting index %d should be of nil type", i)
}
}
}

func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool {
md := make([]*reader.Metadata, 0)

fpInStorage := make([]bool, len(fps))
for i, fp := range fps {
// 50-50 chance that a fingerprint exists in the storage
if rand.Float32() < 0.5 {
md = append(md, &reader.Metadata{Fingerprint: fp})
fpInStorage[i] = true // mark the fingerprint at index i in storage
}
}
// save half keys in knownFiles0 and other half in knownFiles1
_ = checkpoint.SaveKey(context.Background(), persister, md[:len(md)/2], "knownFiles0")
_ = checkpoint.SaveKey(context.Background(), persister, md[len(md)/2:], "knownFiles1")
return fpInStorage
}
2 changes: 1 addition & 1 deletion pkg/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/expr-lang/expr v1.16.9
github.com/fsnotify/fsnotify v1.8.0
github.com/goccy/go-json v0.10.3
github.com/google/uuid v1.6.0
github.com/jonboulle/clockwork v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/json-iterator/go v1.1.12
Expand Down Expand Up @@ -50,7 +51,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand Down

0 comments on commit 1a90009

Please sign in to comment.