From 0777e046923f83dfa08d29d5224b6e5c6cd44636 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Thu, 11 Nov 2021 15:50:26 +0300 Subject: [PATCH] Correct the metadata values by subtracting `removedOffsetsCounter` --- server/server.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/server/server.go b/server/server.go index a3973cc..7750d52 100644 --- a/server/server.go +++ b/server/server.go @@ -680,6 +680,9 @@ func streamRecords(conn net.Conn, data []byte) (err error) { rlimitPartitionRefQueue = make([]int64, 0) } + // removedCounter keeps track of how many offsets belong to a removed partition. + var removedOffsetsCounter int + for { // f is the current partition we're reading the data from. var f *os.File @@ -727,6 +730,7 @@ func streamRecords(conn net.Conn, data []byte) (err error) { // File descriptor nil means; the partition is removed. So we pass this offset. if fRef == nil { + removedOffsetsCounter++ continue } @@ -779,13 +783,17 @@ func streamRecords(conn net.Conn, data []byte) (err error) { } } + // Correct the metadata values by subtracting removedOffsetsCounter + realCurrent := leftOff - int64(removedOffsetsCounter) + realTotal := totalNumberOfRecords - removedOffsetsCounter + metadata, _ = json.Marshal(Metadata{ NumberOfWritten: numberOfWritten, - Current: uint64(leftOff), - Total: uint64(totalNumberOfRecords), + Current: uint64(realCurrent), + Total: uint64(realTotal), }) - if totalNumberOfRecords < 100 || int(leftOff)%(totalNumberOfRecords/100) == 0 { + if realTotal < 100 || int(realCurrent)%(realTotal/100) == 0 { _, err = conn.Write([]byte(fmt.Sprintf("%s %s\n", CMD_METADATA, string(metadata)))) if err != nil { log.Printf("Write error: %v\n", err)