Skip to content

Commit

Permalink
indexserver: log name of freshly minted compound shard
Browse files Browse the repository at this point in the history
This updates zoekt-merge-index to print the name of new compound shard
to stdout. Indexserver picks it up and logs it.

Together with a new log line in `vacuum` we can now follow the full
lifecycle of a compound shard in the logs.

Test plan:
updated unit tests
  • Loading branch information
stefanhengl committed Oct 24, 2024
1 parent 9bdc562 commit 31a960a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 71 deletions.
27 changes: 16 additions & 11 deletions cmd/zoekt-merge-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ import (
"github.com/sourcegraph/zoekt"
)

func merge(dstDir string, names []string) error {
// merge merges the input shards into a compound shard in dstDir. It returns the
// full path to the compound shard. The input shards are removed on success.
func merge(dstDir string, names []string) (string, error) {
var files []zoekt.IndexFile
for _, fn := range names {
f, err := os.Open(fn)
if err != nil {
return err
return "", nil
}
defer f.Close()

indexFile, err := zoekt.NewIndexFile(f)
if err != nil {
return err
return "", err
}
defer indexFile.Close()

Expand All @@ -31,39 +33,40 @@ func merge(dstDir string, names []string) error {

tmpName, dstName, err := zoekt.Merge(dstDir, files...)
if err != nil {
return err
return "", err
}

// Delete input shards.
for _, name := range names {
paths, err := zoekt.IndexFilePaths(name)
if err != nil {
return fmt.Errorf("zoekt-merge-index: %w", err)
return "", fmt.Errorf("zoekt-merge-index: %w", err)
}
for _, p := range paths {
if err := os.Remove(p); err != nil {
return fmt.Errorf("zoekt-merge-index: failed to remove simple shard: %w", err)
return "", fmt.Errorf("zoekt-merge-index: failed to remove simple shard: %w", err)
}
}
}

// We only rename the compound shard if all simple shards could be deleted in the
// previous step. This guarantees we won't have duplicate indexes.
if err := os.Rename(tmpName, dstName); err != nil {
return fmt.Errorf("zoekt-merge-index: failed to rename compound shard: %w", err)
return "", fmt.Errorf("zoekt-merge-index: failed to rename compound shard: %w", err)
}
return nil

return dstName, nil
}

func mergeCmd(paths []string) error {
func mergeCmd(paths []string) (string, error) {
if paths[0] == "-" {
paths = []string{}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
paths = append(paths, strings.TrimSpace(scanner.Text()))
}
if err := scanner.Err(); err != nil {
return err
return "", err
}
log.Printf("merging %d paths from stdin", len(paths))
}
Expand Down Expand Up @@ -129,9 +132,11 @@ func explodeCmd(path string) error {
func main() {
switch subCommand := os.Args[1]; subCommand {
case "merge":
if err := mergeCmd(os.Args[2:]); err != nil {
compoundShardPath, err := mergeCmd(os.Args[2:])
if err != nil {
log.Fatal(err)
}
fmt.Println(compoundShardPath)
case "explode":
if err := explodeCmd(os.Args[2]); err != nil {
log.Fatal(err)
Expand Down
82 changes: 26 additions & 56 deletions cmd/zoekt-merge-index/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,104 +8,80 @@ import (
"sort"
"testing"

"github.com/stretchr/testify/require"

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/shards"
)

func TestMerge(t *testing.T) {
v16Shards, err := filepath.Glob("../../testdata/shards/*_v16.*.zoekt")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
sort.Strings(v16Shards)

testShards, err := copyTestShards(t.TempDir(), v16Shards)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Log(testShards)

dir := t.TempDir()
err = merge(dir, testShards)
if err != nil {
t.Fatal(err)
}
cs, err := merge(dir, testShards)
require.NoError(t, err)
// The name of the compound shard is based on the merged repos, so it should be
// stable
require.Equal(t, filepath.Base(cs), "compound-ea9613e2ffba7d7361856aebfca75fb714856509_v17.00000.zoekt")

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
require.NoError(t, err)
defer ss.Close()

q, err := query.Parse("hello")
if err != nil {
t.Fatalf("Parse(hello): %v", err)
}
require.NoError(t, err)

var sOpts zoekt.SearchOptions
ctx := context.Background()
result, err := ss.Search(ctx, q, &sOpts)
if err != nil {
t.Fatalf("Search(%v): %v", q, err)
}
require.NoError(t, err)

// we are merging the same shard twice, so we expect the same file twice.
if len(result.Files) != 2 {
t.Errorf("got %v, want 2 files.", result.Files)
}
require.Len(t, result.Files, 2)
}

// Merge 2 simple shards and then explode them.
func TestExplode(t *testing.T) {
v16Shards, err := filepath.Glob("../../testdata/shards/repo*_v16.*.zoekt")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
sort.Strings(v16Shards)

testShards, err := copyTestShards(t.TempDir(), v16Shards)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Log(testShards)

dir := t.TempDir()
err = merge(dir, testShards)
if err != nil {
t.Fatal(err)
}
_, err = merge(dir, testShards)
require.NoError(t, err)

cs, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
err = explode(dir, cs[0])
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

cs, err = filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if len(cs) != 0 {
t.Fatalf("explode should have deleted the compound shard if it returned without error")
}

exploded, err := filepath.Glob(filepath.Join(dir, "*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if len(exploded) != len(testShards) {
t.Fatalf("the number of simple shards before %d and after %d should be the same", len(testShards), len(exploded))
}

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
require.NoError(t, err)
defer ss.Close()

var sOpts zoekt.SearchOptions
Expand All @@ -132,16 +108,10 @@ func TestExplode(t *testing.T) {
for _, c := range cases {
t.Run(c.searchLiteral, func(t *testing.T) {
q, err := query.Parse(c.searchLiteral)
if err != nil {
t.Fatalf("Parse(%s): %v", c.searchLiteral, err)
}
require.NoError(t, err)
result, err := ss.Search(ctx, q, &sOpts)
if err != nil {
t.Fatalf("Search(%v): %v", q, err)
}
if got := len(result.Files); got != c.wantResults {
t.Fatalf("wanted %d results, got %d", c.wantResults, got)
}
require.NoError(t, err)
require.Len(t, result.Files, c.wantResults)
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/zoekt-sourcegraph-indexserver/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,9 @@ func (s *Server) vacuum() {
})

if err != nil {
log.Printf("failed to explode compound shard %s: %s", path, string(b))
log.Printf("failed to explode compound shard: shard=%s out=%s err=%s", path, string(b), err)
}
log.Printf("exploded compound shard: shard=%s", path)
continue
}

Expand All @@ -432,7 +433,7 @@ func (s *Server) vacuum() {
})

if err != nil {
log.Printf("error while removing tombstones in %s: %s", fn, err)
log.Printf("error while removing tombstones in %s: %s", path, err)
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions cmd/zoekt-sourcegraph-indexserver/merge.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"log"
"os"
"os/exec"
Expand Down Expand Up @@ -88,14 +89,25 @@ func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
}

start := time.Now()
out, err := mergeCmd(paths...).CombinedOutput()

cmd := mergeCmd(paths...)

// zoekt-merge-index writes the full path of the new compound shard to stdout.
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
cmd.Stdout = stdoutBuf
cmd.Stderr = stderrBuf

err := cmd.Run()

metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())
if err != nil {
log.Printf("mergeCmd: out=%s, err=%s", out, err)
log.Printf("error merging shards: stdout=%s, stderr=%s, err=%s", stdoutBuf.String(), stderrBuf.String(), err)
return
}

log.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), time.Since(start).Seconds())

next = true
})
}
Expand Down

0 comments on commit 31a960a

Please sign in to comment.