Skip to content

Commit

Permalink
Correct the metadata values by subtracting removedOffsetsCounter
Browse files Browse the repository at this point in the history
  • Loading branch information
M. Mert Yildiran committed Nov 11, 2021
1 parent 0ec839c commit 0777e04
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,9 @@ func streamRecords(conn net.Conn, data []byte) (err error) {
rlimitPartitionRefQueue = make([]int64, 0)
}

// removedCounter keeps track of how many offsets belong to a removed partition.
var removedOffsetsCounter int

for {
// f is the current partition we're reading the data from.
var f *os.File
Expand Down Expand Up @@ -727,6 +730,7 @@ func streamRecords(conn net.Conn, data []byte) (err error) {

// File descriptor nil means; the partition is removed. So we pass this offset.
if fRef == nil {
removedOffsetsCounter++
continue
}

Expand Down Expand Up @@ -779,13 +783,17 @@ func streamRecords(conn net.Conn, data []byte) (err error) {
}
}

// Correct the metadata values by subtracting removedOffsetsCounter
realCurrent := leftOff - int64(removedOffsetsCounter)
realTotal := totalNumberOfRecords - removedOffsetsCounter

metadata, _ = json.Marshal(Metadata{
NumberOfWritten: numberOfWritten,
Current: uint64(leftOff),
Total: uint64(totalNumberOfRecords),
Current: uint64(realCurrent),
Total: uint64(realTotal),
})

if totalNumberOfRecords < 100 || int(leftOff)%(totalNumberOfRecords/100) == 0 {
if realTotal < 100 || int(realCurrent)%(realTotal/100) == 0 {
_, err = conn.Write([]byte(fmt.Sprintf("%s %s\n", CMD_METADATA, string(metadata))))
if err != nil {
log.Printf("Write error: %v\n", err)
Expand Down

0 comments on commit 0777e04

Please sign in to comment.