Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug: write memory profile if heap exceeds threshold #819

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 48 additions & 28 deletions build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"time"

"github.com/bmatcuk/doublestar"
"github.com/dustin/go-humanize"
"github.com/go-enry/go-enry/v2"
"github.com/rs/xid"

Expand Down Expand Up @@ -88,9 +89,6 @@ type Options struct {
// If set, ctags must succeed.
CTagsMustSucceed bool

// Write memory profiles to this file.
MemProfile string

// LargeFiles is a slice of glob patterns, including ** for any number
// of directories, where matching file paths should be indexed
// regardless of their size. The full pattern syntax is here:
Expand Down Expand Up @@ -120,6 +118,12 @@ type Options struct {
// ShardMerging is true if builder should respect compound shards. This is a
// Sourcegraph specific option.
ShardMerging bool

// HeapProfileTriggerBytes is the heap usage in bytes that will trigger a memory profile. If 0, no memory profile will be triggered.
// Profiles will be written to files named `index-memory.prof.n` in the index directory. No more than 10 files are written.
//
// Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile.
HeapProfileTriggerBytes uint64
}

// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building.
Expand Down Expand Up @@ -194,7 +198,6 @@ func (o *Options) Flags(fs *flag.FlagSet) {
fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices")
fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.")
fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.")
fs.StringVar(&o.MemProfile, "memprofile", "", "write memory profile(s) to `file.shardnum`. Note: sets parallelism to 1.")

// Sourcegraph specific
fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.")
Expand Down Expand Up @@ -270,6 +273,10 @@ type Builder struct {
// indexTime is set by tests for doing reproducible builds.
indexTime time.Time

// heapProfileMu is used to ensure that only one memory profile is written at a time
heapProfileMu sync.Mutex
heapProfileNum int

// a sortable 20 chars long id.
id string

Expand Down Expand Up @@ -835,7 +842,7 @@ func (b *Builder) flush() error {
shard := b.nextShardNum
b.nextShardNum++

if b.opts.Parallelism > 1 && b.opts.MemProfile == "" {
if b.opts.Parallelism > 1 {
b.building.Add(1)
b.throttle <- 1
go func() {
Expand All @@ -860,35 +867,13 @@ func (b *Builder) flush() error {
if err == nil {
b.finishedShards[done.temp] = done.final
}
if b.opts.MemProfile != "" {
// drop memory, and profile.
todo = nil
b.writeMemProfile(b.opts.MemProfile)
}

return b.buildError
}

return nil
}

var profileNumber int

func (b *Builder) writeMemProfile(name string) {
nm := fmt.Sprintf("%s.%d", name, profileNumber)
profileNumber++
f, err := os.Create(nm)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
f.Close()
log.Printf("wrote mem profile %q", nm)
}

// map [0,inf) to [0,1) monotonically
func squashRange(j int) float64 {
x := float64(j)
Expand Down Expand Up @@ -1011,15 +996,50 @@ func (b *Builder) buildShard(todo []*zoekt.Document, nextShardNum int) (*finishe

sortDocuments(todo)

for _, t := range todo {
for idx, t := range todo {
if err := shardBuilder.Add(*t); err != nil {
return nil, err
}

if idx%10_000 == 0 {
b.CheckMemoryUsage()
}
Comment on lines +1004 to +1006
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we check based on doc count and not, for example, time based? If it's time based we wouldn't have to check in two places?

Copy link
Member Author

@jtibshirani jtibshirani Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! That approach could definitely work. What I liked about this: it gives fine-grained control over when we check memory. So we can check exactly when we are most concerned about an impending OOM, to maximize the chance we'll get useful data. In fact I just added another check after my latest round of research into memory usage (I found that loading the list of files to index can be very expensive).

}

return b.writeShard(name, shardBuilder)
}

// CheckMemoryUsage checks the memory usage of the process and writes a memory profile if the heap usage exceeds the
// configured threshold. NOTE: this method is expensive and should only be used for debugging.
func (b *Builder) CheckMemoryUsage() {
// Don't check memory if heap profiling is disabled, or we've already written 10 profiles
if b.opts.HeapProfileTriggerBytes <= 0 || b.heapProfileNum >= 10 {
return
}

var m runtime.MemStats
runtime.ReadMemStats(&m)

if m.HeapAlloc > b.opts.HeapProfileTriggerBytes && b.heapProfileMu.TryLock() {
defer b.heapProfileMu.Unlock()

log.Printf("writing memory profile, heap usage: %s", humanize.Bytes(m.HeapAlloc))
name := filepath.Join(b.opts.IndexDir, fmt.Sprintf("indexmemory.prof.%d", b.heapProfileNum))
f, err := os.Create(name)
if err != nil {
log.Printf("failed to create memory profile file: %v", err)
return
}

err = pprof.WriteHeapProfile(f)
if err != nil {
log.Printf("failed to write memory profile: %v", err)
}

b.heapProfileNum++
}
}

func (b *Builder) newShardBuilder() (*zoekt.IndexBuilder, error) {
desc := b.opts.RepositoryDescription
desc.HasSymbols = !b.opts.DisableCTags && b.opts.CTagsPath != ""
Expand Down
19 changes: 15 additions & 4 deletions cmd/zoekt-git-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"runtime/pprof"
"strings"

"github.com/dustin/go-humanize"
"github.com/sourcegraph/zoekt/internal/profiler"
"go.uber.org/automaxprocs/maxprocs"

Expand All @@ -31,8 +32,6 @@ import (
)

func run() int {
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")

allowMissing := flag.Bool("allow_missing_branches", false, "allow missing branches.")
submodules := flag.Bool("submodules", true, "if set to false, do not recurse into submodules")
branchesStr := flag.String("branches", "HEAD", "git branches to index.")
Expand All @@ -47,13 +46,16 @@ func run() int {
offlineRanking := flag.String("offline_ranking", "", "the name of the file that contains the ranking info.")
offlineRankingVersion := flag.String("offline_ranking_version", "", "a version string identifying the contents in offline_ranking.")
languageMap := flag.String("language_map", "", "a mapping between a language and its ctags processor (a:0,b:3).")

cpuProfile := flag.String("cpuprofile", "", "write cpu profile to `file`")

flag.Parse()

// Tune GOMAXPROCS to match Linux container CPU quota.
_, _ = maxprocs.Set()

if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if *cpuProfile != "" {
f, err := os.Create(*cpuProfile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
Expand Down Expand Up @@ -109,6 +111,15 @@ func run() int {
opts.LanguageMap[m[0]] = ctags.StringToParser(m[1])
}

if heapProfileTrigger := os.Getenv("ZOEKT_HEAP_PROFILE_TRIGGER"); heapProfileTrigger != "" {
trigger, err := humanize.ParseBytes(heapProfileTrigger)
if err != nil {
log.Printf("invalid value for ZOEKT_HEAP_PROFILE_TRIGGER: %v", err)
} else {
opts.HeapProfileTriggerBytes = trigger
}
}

profiler.Init("zoekt-git-index")
exitStatus := 0
for dir, name := range gitRepos {
Expand Down
7 changes: 5 additions & 2 deletions gitindex/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) {
names = uniq(names)

log.Printf("attempting to index %d total files", totalFiles)
for _, name := range names {
for idx, name := range names {
keys := fileKeys[name]

for _, key := range keys {
Expand All @@ -574,9 +574,12 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) {
if err := builder.Add(doc); err != nil {
return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err)
}

if idx%10_000 == 0 {
builder.CheckMemoryUsage()
}
}
}

return true, builder.Finish()
}

Expand Down
Loading