Skip to content

Commit

Permalink
meta/backup: fix slice conflict (#5361)
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <[email protected]>
  • Loading branch information
jiefenghuang authored Dec 11, 2024
1 parent 9c19b3d commit d1c00a8
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 37 deletions.
21 changes: 8 additions & 13 deletions pkg/meta/redis_bak.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func (m *redisMeta) dumpDelFiles(ctx Context, opt *DumpOption, ch chan<- *dumped
return err
}

delFiles := make([]*pb.DelFile, 0, len(zs))
for _, z := range zs {
delFiles := make([]*pb.DelFile, 0, utils.Min(len(zs), redisBatchSize))
for i, z := range zs {
parts := strings.Split(z.Member.(string), ":")
if len(parts) != 2 {
logger.Warnf("invalid delfile string: %s", z.Member.(string))
Expand All @@ -255,7 +255,7 @@ func (m *redisMeta) dumpDelFiles(ctx Context, opt *DumpOption, ch chan<- *dumped
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Delfiles: delFiles}}); err != nil {
return err
}
delFiles = delFiles[:0]
delFiles = make([]*pb.DelFile, 0, utils.Min(len(zs)-i-1, redisBatchSize))
}
}
return dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Delfiles: delFiles}})
Expand Down Expand Up @@ -285,7 +285,7 @@ func (m *redisMeta) dumpSliceRef(ctx Context, opt *DumpOption, ch chan<- *dumped
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{SliceRefs: sliceRefs}}); err != nil {
return err
}
sliceRefs = sliceRefs[:0]
sliceRefs = make([]*pb.SliceRef, 0, 1024)
}
}
}
Expand Down Expand Up @@ -434,14 +434,16 @@ func (m *redisMeta) dumpDirStat(ctx Context, opt *DumpOption, ch chan<- *dumpedR
}
}

ss := make([]*pb.Stat, 0, len(stats))
ss := make([]*pb.Stat, 0, utils.Min(len(stats), redisBatchSize))
cnt := 0
for _, s := range stats {
cnt++
ss = append(ss, s)
if len(ss) >= redisBatchSize {
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Dirstats: ss}}); err != nil {
return err
}
ss = ss[:0]
ss = make([]*pb.Stat, 0, utils.Min(len(stats)-cnt, redisBatchSize))
}
}
return dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Dirstats: ss}})
Expand Down Expand Up @@ -601,13 +603,6 @@ func (m *redisMeta) dumpXattrs(ctx context.Context, ch chan<- *dumpedResult, key
xattr.Name = k
xattr.Value = []byte(v)
xattrs = append(xattrs, xattr)
if len(xattrs) >= redisBatchSize {
if err := dumpResult(ctx, ch, &dumpedResult{&pb.Batch{Xattrs: xattrs}, rel}); err != nil {
return err
}
sum.Add(uint64(len(xattrs)))
xattrs = xattrs[:0]
}
}
}
sum.Add(uint64(len(xattrs)))
Expand Down
35 changes: 11 additions & 24 deletions pkg/meta/sql_bak.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (m *dbMeta) dumpNodes(ctx Context, opt *DumpOption, ch chan<- *dumpedResult

var rows []node
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Where("inode >= ?", TrashInode).Find(&rows)
}); err != nil {
return err
Expand Down Expand Up @@ -132,7 +131,6 @@ func (m *dbMeta) dumpNodes(ctx Context, opt *DumpOption, ch chan<- *dumpedResult
return sqlQueryBatch(ctx, opt, maxInode, func(ctx context.Context, start, end uint64) (int, error) {
var rows []node
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Where("inode >= ? AND inode < ?", start, end).Find(&rows)
}); err != nil {
return 0, err
Expand Down Expand Up @@ -174,7 +172,6 @@ func (m *dbMeta) dumpChunks(ctx Context, opt *DumpOption, ch chan<- *dumpedResul
return sqlQueryBatch(ctx, opt, maxId, func(ctx context.Context, start, end uint64) (int, error) {
var rows []chunk
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Where("id >= ? AND id < ?", start, end).Find(&rows)
}); err != nil {
return 0, err
Expand Down Expand Up @@ -217,7 +214,6 @@ func (m *dbMeta) dumpEdges(ctx Context, opt *DumpOption, ch chan<- *dumpedResult
err = sqlQueryBatch(ctx, opt, maxId, func(ctx context.Context, start, end uint64) (int, error) {
var rows []edge
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Where("id >= ? AND id < ?", start, end).Find(&rows)
}); err != nil {
return 0, err
Expand Down Expand Up @@ -258,7 +254,7 @@ func (m *dbMeta) dumpEdges(ctx Context, opt *DumpOption, ch chan<- *dumpedResult
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Parents: parents}}); err != nil {
return err
}
parents = parents[:0]
parents = make([]*pb.Parent, 0, sqlDumpBatchSize)
}
}
return dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Parents: parents}})
Expand All @@ -267,20 +263,19 @@ func (m *dbMeta) dumpEdges(ctx Context, opt *DumpOption, ch chan<- *dumpedResult
func (m *dbMeta) dumpSymlinks(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []symlink
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Find(&rows)
}); err != nil {
return err
}

symlinks := make([]*pb.Symlink, 0, min(len(rows), sqlDumpBatchSize))
for _, r := range rows {
for i, r := range rows {
symlinks = append(symlinks, &pb.Symlink{Inode: uint64(r.Inode), Target: r.Target})
if len(symlinks) >= sqlDumpBatchSize {
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Symlinks: symlinks}}); err != nil {
return err
}
symlinks = symlinks[:0]
symlinks = make([]*pb.Symlink, 0, min(len(rows)-i-1, sqlDumpBatchSize))
}
}
return dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Symlinks: symlinks}})
Expand All @@ -289,7 +284,6 @@ func (m *dbMeta) dumpSymlinks(ctx Context, opt *DumpOption, ch chan<- *dumpedRes
func (m *dbMeta) dumpCounters(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []counter
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Find(&rows)
}); err != nil {
return err
Expand All @@ -304,7 +298,6 @@ func (m *dbMeta) dumpCounters(ctx Context, opt *DumpOption, ch chan<- *dumpedRes
func (m *dbMeta) dumpSustained(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []sustained
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Find(&rows)
}); err != nil {
return err
Expand All @@ -323,19 +316,18 @@ func (m *dbMeta) dumpSustained(ctx Context, opt *DumpOption, ch chan<- *dumpedRe
func (m *dbMeta) dumpDelFiles(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []delfile
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Find(&rows)
}); err != nil {
return err
}
delFiles := make([]*pb.DelFile, 0, min(sqlDumpBatchSize, len(rows)))
for _, row := range rows {
for i, row := range rows {
delFiles = append(delFiles, &pb.DelFile{Inode: uint64(row.Inode), Length: row.Length, Expire: row.Expire})
if len(delFiles) >= sqlDumpBatchSize {
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Delfiles: delFiles}}); err != nil {
return err
}
delFiles = delFiles[:0]
delFiles = make([]*pb.DelFile, 0, min(sqlDumpBatchSize, len(rows)-i-1))
}
}
return dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Delfiles: delFiles}})
Expand All @@ -344,19 +336,18 @@ func (m *dbMeta) dumpDelFiles(ctx Context, opt *DumpOption, ch chan<- *dumpedRes
func (m *dbMeta) dumpSliceRef(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []sliceRef
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Where("refs != 1").Find(&rows) // skip default refs
}); err != nil {
return err
}
sliceRefs := make([]*pb.SliceRef, 0, min(sqlDumpBatchSize, len(rows)))
for _, sr := range rows {
for i, sr := range rows {
sliceRefs = append(sliceRefs, &pb.SliceRef{Id: sr.Id, Size: sr.Size, Refs: int64(sr.Refs)})
if len(sliceRefs) >= sqlDumpBatchSize {
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{SliceRefs: sliceRefs}}); err != nil {
return err
}
sliceRefs = sliceRefs[:0]
sliceRefs = make([]*pb.SliceRef, 0, min(sqlDumpBatchSize, len(rows)-i-1))
}
}
return dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{SliceRefs: sliceRefs}})
Expand All @@ -365,7 +356,6 @@ func (m *dbMeta) dumpSliceRef(ctx Context, opt *DumpOption, ch chan<- *dumpedRes
func (m *dbMeta) dumpACL(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []acl
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Find(&rows)
}); err != nil {
return err
Expand All @@ -383,13 +373,12 @@ func (m *dbMeta) dumpACL(ctx Context, opt *DumpOption, ch chan<- *dumpedResult)
func (m *dbMeta) dumpXattr(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []xattr
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Find(&rows)
}); err != nil {
return err
}
xattrs := make([]*pb.Xattr, 0, min(sqlDumpBatchSize, len(rows)))
for _, x := range rows {
for i, x := range rows {
xattrs = append(xattrs, &pb.Xattr{
Inode: uint64(x.Inode),
Name: x.Name,
Expand All @@ -399,7 +388,7 @@ func (m *dbMeta) dumpXattr(ctx Context, opt *DumpOption, ch chan<- *dumpedResult
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Xattrs: xattrs}}); err != nil {
return err
}
xattrs = xattrs[:0]
xattrs = make([]*pb.Xattr, 0, min(sqlDumpBatchSize, len(rows)-i-1))
}
}
return dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Xattrs: xattrs}})
Expand All @@ -408,7 +397,6 @@ func (m *dbMeta) dumpXattr(ctx Context, opt *DumpOption, ch chan<- *dumpedResult
func (m *dbMeta) dumpQuota(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []dirQuota
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Find(&rows)
}); err != nil {
return err
Expand All @@ -429,13 +417,12 @@ func (m *dbMeta) dumpQuota(ctx Context, opt *DumpOption, ch chan<- *dumpedResult
func (m *dbMeta) dumpDirStat(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error {
var rows []dirStats
if err := m.roTxn(ctx, func(s *xorm.Session) error {
rows = rows[:0]
return s.Find(&rows)
}); err != nil {
return err
}
dirStats := make([]*pb.Stat, 0, min(sqlDumpBatchSize, len(rows)))
for _, st := range rows {
for i, st := range rows {
dirStats = append(dirStats, &pb.Stat{
Inode: uint64(st.Inode),
DataLength: st.DataLength,
Expand All @@ -446,7 +433,7 @@ func (m *dbMeta) dumpDirStat(ctx Context, opt *DumpOption, ch chan<- *dumpedResu
if err := dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Dirstats: dirStats}}); err != nil {
return err
}
dirStats = dirStats[:0]
dirStats = make([]*pb.Stat, 0, min(sqlDumpBatchSize, len(rows)-i-1))
}
}
return dumpResult(ctx, ch, &dumpedResult{msg: &pb.Batch{Dirstats: dirStats}})
Expand Down

0 comments on commit d1c00a8

Please sign in to comment.