Skip to content

Commit

Permalink
[filebeat][azure-blob-storage] - Simplified state checkpoint calculat…
Browse files Browse the repository at this point in the history
…ion (elastic#40936)
  • Loading branch information
ShourieG authored Oct 2, 2024
1 parent 4184106 commit e345f28
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]
- Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651]
- Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775]
- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936]

==== Deprecated

Expand Down
48 changes: 14 additions & 34 deletions x-pack/filebeat/input/azureblobstorage/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package azureblobstorage
import (
"context"
"fmt"
"slices"
"sort"
"sync"

azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
Expand Down Expand Up @@ -190,41 +192,19 @@ func (s *scheduler) fetchBlobPager(batchSize int32) *azruntime.Pager[azblob.List
// moveToLastSeenJob, moves to the latest job position past the last seen job
// Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp
func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job {
var latestJobs []*job
jobsToReturn := make([]*job, 0)
counter := 0
flag := false
ignore := false

for _, job := range jobs {
switch {
case job.timestamp().After(s.state.checkpoint().LatestEntryTime):
latestJobs = append(latestJobs, job)
case job.name() == s.state.checkpoint().BlobName:
flag = true
case job.name() > s.state.checkpoint().BlobName:
flag = true
counter--
case job.name() <= s.state.checkpoint().BlobName && (!ignore):
ignore = true
}
counter++
}

if flag && (counter < len(jobs)-1) {
jobsToReturn = jobs[counter+1:]
} else if !flag && !ignore {
jobsToReturn = jobs
}

// in a senario where there are some jobs which have a greater timestamp
// but lesser alphanumeric order and some jobs have greater alphanumeric order
// than the current checkpoint blob name, then we append the latest jobs
if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 {
jobsToReturn = append(latestJobs, jobsToReturn...)
}
cp := s.state.checkpoint()
jobs = slices.DeleteFunc(jobs, func(j *job) bool {
return !(j.timestamp().After(cp.LatestEntryTime) || j.name() > cp.BlobName)
})

return jobsToReturn
// In a scenario where there are some jobs which have a greater timestamp
// but lesser lexicographic order and some jobs have greater lexicographic order
// than the current checkpoint blob name, we then sort around the pivot checkpoint
// timestamp.
sort.SliceStable(jobs, func(i, _ int) bool {
return jobs[i].timestamp().After(cp.LatestEntryTime)
})
return jobs
}

func (s *scheduler) isFileSelected(name string) bool {
Expand Down

0 comments on commit e345f28

Please sign in to comment.