Skip to content

Commit

Permalink
Add verify-segment command (#151)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jun 12, 2023
1 parent e70cc6d commit 8fcf065
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ bw_debug.log

# vim
*.sw[op]

# backup file
*.bak.gz
4 changes: 2 additions & 2 deletions states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath str
if err != nil {
return nil, err
}
c := models.NewCollectionFromV2_1(info, string(resp.Kvs[0].Key))
c := models.NewCollectionFromV2_1(info, string(kv.Key))
return c, nil

case models.GTEVersion2_2:
Expand All @@ -199,7 +199,7 @@ func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath str
if err != nil {
return nil, err
}
c := models.NewCollectionFromV2_2(info, string(resp.Kvs[0].Key), fields)
c := models.NewCollectionFromV2_2(info, string(kv.Key), fields)
return c, nil
default:
return nil, errors.New("not supported version")
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func getSegmentLazyFunc(cli clientv3.KV, basePath string, segment datapbv2.Segme

prefix = path.Join(basePath, "datacoord-meta", fmt.Sprintf("statslog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID))
statslogs, err := f(func(segment datapbv2.SegmentInfo, fieldID int64, logID int64) string {
return fmt.Sprintf("files/statslog/%d/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID, logID)
return fmt.Sprintf("files/stats_log/%d/%d/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID, fieldID, logID)
})
if err != nil {
return nil, nil, nil, err
Expand Down
3 changes: 3 additions & 0 deletions states/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func (s *instanceState) SetupCommands() {
// segment-loaded
GetDistributionCommand(cli, basePath),

//
getVerifySegmentCmd(cli, basePath),

// probe
GetProbeCmd(cli, basePath),

Expand Down
103 changes: 103 additions & 0 deletions states/verify_segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package states

import (
"context"
"fmt"
"strings"

"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/minio/minio-go/v7"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

func getVerifySegmentCmd(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "verify-segment",
Short: "Verify segment file matches storage",
Run: func(cmd *cobra.Command, args []string) {
collectionID, err := cmd.Flags().GetInt64("collection")
if err != nil {
fmt.Println(err.Error())
return
}
patch, err := cmd.Flags().GetBool("patch")
if err != nil {
fmt.Println(err.Error())
return
}

segments, err := common.ListSegmentsVersion(context.Background(), cli, basePath, etcdversion.GetVersion(), func(seg *models.Segment) bool {
return seg.CollectionID == collectionID && seg.State == models.SegmentStateFlushed
})

if err != nil {
fmt.Println("failed to list segment info", err.Error())
}

minioClient, bucketName, err := getMinioAccess()
if err != nil {
fmt.Println("failed to get minio access", err.Error())
return
}

total := len(segments)
for idx, segment := range segments {
fmt.Printf("Start to verify segment(%d) --- (%d/%d)\n", segment.ID, idx, total)
type item struct {
tag string
fieldBinlogs []*models.FieldBinlog
}
items := []item{
{tag: "binlog", fieldBinlogs: segment.GetBinlogs()},
{tag: "statslog", fieldBinlogs: segment.GetStatslogs()},
{tag: "deltalog", fieldBinlogs: segment.GetDeltalogs()},
}
for _, item := range items {
for _, statslog := range item.fieldBinlogs {
for _, l := range statslog.Binlogs {
_, err := minioClient.StatObject(context.Background(), bucketName, l.LogPath, minio.StatObjectOptions{})
if err != nil {
errResp := minio.ToErrorResponse(err)
if errResp.Code != "NoSuchKey" {
fmt.Println("failed to stat object in minio", err.Error())
continue
}
if !patch {
fmt.Println("file not exists in minio", l.LogPath)
continue
}
// try to patch 01 => 1 bug
if item.tag == "statslog" && strings.HasSuffix(l.LogPath, "/1") {
currentObjectPath := strings.TrimSuffix(l.LogPath, "/1") + "/01"
_, err = minioClient.StatObject(context.Background(), bucketName, currentObjectPath, minio.StatObjectOptions{})
if err != nil {
fmt.Println(currentObjectPath, "also not exists")
continue
}
fmt.Printf("current statslog(%s) for (%s) found, try to copy object", currentObjectPath, l.LogPath)
minioClient.CopyObject(context.Background(), minio.CopyDestOptions{
Bucket: bucketName,
Object: l.LogPath,
}, minio.CopySrcOptions{
Bucket: bucketName,
Object: currentObjectPath,
})
}
}
}
}

fmt.Printf("Segment(%d) %s done\n", segment.ID, item.tag)
}

}
},
}

cmd.Flags().Int64("collection", 0, "collection id")
cmd.Flags().Bool("patch", false, "try to patch with known issue logic")
return cmd
}

0 comments on commit 8fcf065

Please sign in to comment.