Skip to content

Commit

Permalink
indexserver: log name of freshly minted compound shard (#849)
Browse files Browse the repository at this point in the history
This updates zoekt-merge-index to print the name of a new compound shard to stdout. Indexserver picks it up and logs it. This has the nice property that indexserver now has all the info. If we want to log this to a file in the future, we don't have to worry as much about competing writes to the file.

Together with a new log line in vacuum we can now follow the full lifecycle of a compound shard in the logs.

Test plan:
updated unit tests
  • Loading branch information
stefanhengl authored Oct 24, 2024
1 parent 9bdc562 commit b3b74a2
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 72 deletions.
27 changes: 16 additions & 11 deletions cmd/zoekt-merge-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ import (
"github.com/sourcegraph/zoekt"
)

func merge(dstDir string, names []string) error {
// merge merges the input shards into a compound shard in dstDir. It returns the
// full path to the compound shard. The input shards are removed on success.
func merge(dstDir string, names []string) (string, error) {
var files []zoekt.IndexFile
for _, fn := range names {
f, err := os.Open(fn)
if err != nil {
return err
return "", nil
}
defer f.Close()

indexFile, err := zoekt.NewIndexFile(f)
if err != nil {
return err
return "", err
}
defer indexFile.Close()

Expand All @@ -31,39 +33,40 @@ func merge(dstDir string, names []string) error {

tmpName, dstName, err := zoekt.Merge(dstDir, files...)
if err != nil {
return err
return "", err
}

// Delete input shards.
for _, name := range names {
paths, err := zoekt.IndexFilePaths(name)
if err != nil {
return fmt.Errorf("zoekt-merge-index: %w", err)
return "", fmt.Errorf("zoekt-merge-index: %w", err)
}
for _, p := range paths {
if err := os.Remove(p); err != nil {
return fmt.Errorf("zoekt-merge-index: failed to remove simple shard: %w", err)
return "", fmt.Errorf("zoekt-merge-index: failed to remove simple shard: %w", err)
}
}
}

// We only rename the compound shard if all simple shards could be deleted in the
// previous step. This guarantees we won't have duplicate indexes.
if err := os.Rename(tmpName, dstName); err != nil {
return fmt.Errorf("zoekt-merge-index: failed to rename compound shard: %w", err)
return "", fmt.Errorf("zoekt-merge-index: failed to rename compound shard: %w", err)
}
return nil

return dstName, nil
}

func mergeCmd(paths []string) error {
func mergeCmd(paths []string) (string, error) {
if paths[0] == "-" {
paths = []string{}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
paths = append(paths, strings.TrimSpace(scanner.Text()))
}
if err := scanner.Err(); err != nil {
return err
return "", err
}
log.Printf("merging %d paths from stdin", len(paths))
}
Expand Down Expand Up @@ -129,9 +132,11 @@ func explodeCmd(path string) error {
func main() {
switch subCommand := os.Args[1]; subCommand {
case "merge":
if err := mergeCmd(os.Args[2:]); err != nil {
compoundShardPath, err := mergeCmd(os.Args[2:])
if err != nil {
log.Fatal(err)
}
fmt.Println(compoundShardPath)
case "explode":
if err := explodeCmd(os.Args[2]); err != nil {
log.Fatal(err)
Expand Down
82 changes: 26 additions & 56 deletions cmd/zoekt-merge-index/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,104 +8,80 @@ import (
"sort"
"testing"

"github.com/stretchr/testify/require"

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/shards"
)

func TestMerge(t *testing.T) {
v16Shards, err := filepath.Glob("../../testdata/shards/*_v16.*.zoekt")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
sort.Strings(v16Shards)

testShards, err := copyTestShards(t.TempDir(), v16Shards)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Log(testShards)

dir := t.TempDir()
err = merge(dir, testShards)
if err != nil {
t.Fatal(err)
}
cs, err := merge(dir, testShards)
require.NoError(t, err)
// The name of the compound shard is based on the merged repos, so it should be
// stable
require.Equal(t, filepath.Base(cs), "compound-ea9613e2ffba7d7361856aebfca75fb714856509_v17.00000.zoekt")

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
require.NoError(t, err)
defer ss.Close()

q, err := query.Parse("hello")
if err != nil {
t.Fatalf("Parse(hello): %v", err)
}
require.NoError(t, err)

var sOpts zoekt.SearchOptions
ctx := context.Background()
result, err := ss.Search(ctx, q, &sOpts)
if err != nil {
t.Fatalf("Search(%v): %v", q, err)
}
require.NoError(t, err)

// we are merging the same shard twice, so we expect the same file twice.
if len(result.Files) != 2 {
t.Errorf("got %v, want 2 files.", result.Files)
}
require.Len(t, result.Files, 2)
}

// Merge 2 simple shards and then explode them.
func TestExplode(t *testing.T) {
v16Shards, err := filepath.Glob("../../testdata/shards/repo*_v16.*.zoekt")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
sort.Strings(v16Shards)

testShards, err := copyTestShards(t.TempDir(), v16Shards)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Log(testShards)

dir := t.TempDir()
err = merge(dir, testShards)
if err != nil {
t.Fatal(err)
}
_, err = merge(dir, testShards)
require.NoError(t, err)

cs, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
err = explode(dir, cs[0])
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

cs, err = filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if len(cs) != 0 {
t.Fatalf("explode should have deleted the compound shard if it returned without error")
}

exploded, err := filepath.Glob(filepath.Join(dir, "*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if len(exploded) != len(testShards) {
t.Fatalf("the number of simple shards before %d and after %d should be the same", len(testShards), len(exploded))
}

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
require.NoError(t, err)
defer ss.Close()

var sOpts zoekt.SearchOptions
Expand All @@ -132,16 +108,10 @@ func TestExplode(t *testing.T) {
for _, c := range cases {
t.Run(c.searchLiteral, func(t *testing.T) {
q, err := query.Parse(c.searchLiteral)
if err != nil {
t.Fatalf("Parse(%s): %v", c.searchLiteral, err)
}
require.NoError(t, err)
result, err := ss.Search(ctx, q, &sOpts)
if err != nil {
t.Fatalf("Search(%v): %v", q, err)
}
if got := len(result.Files); got != c.wantResults {
t.Fatalf("wanted %d results, got %d", c.wantResults, got)
}
require.NoError(t, err)
require.Len(t, result.Files, c.wantResults)
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/zoekt-sourcegraph-indexserver/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,9 @@ func (s *Server) vacuum() {
})

if err != nil {
log.Printf("failed to explode compound shard %s: %s", path, string(b))
log.Printf("failed to explode compound shard: shard=%s out=%s err=%s", path, string(b), err)
}
log.Printf("exploded compound shard: shard=%s", path)
continue
}

Expand All @@ -432,7 +433,7 @@ func (s *Server) vacuum() {
})

if err != nil {
log.Printf("error while removing tombstones in %s: %s", fn, err)
log.Printf("error while removing tombstones in %s: %s", path, err)
}
}
}
Expand Down
19 changes: 16 additions & 3 deletions cmd/zoekt-sourcegraph-indexserver/merge.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"log"
"os"
"os/exec"
Expand Down Expand Up @@ -88,14 +89,26 @@ func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
}

start := time.Now()
out, err := mergeCmd(paths...).CombinedOutput()

metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())
cmd := mergeCmd(paths...)

// zoekt-merge-index writes the full path of the new compound shard to stdout.
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
cmd.Stdout = stdoutBuf
cmd.Stderr = stderrBuf

err := cmd.Run()

durationSeconds := time.Since(start).Seconds()
metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds)
if err != nil {
log.Printf("mergeCmd: out=%s, err=%s", out, err)
log.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err)
return
}

log.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds)

next = true
})
}
Expand Down

0 comments on commit b3b74a2

Please sign in to comment.