Skip to content

Commit

Permalink
Implement meta periodic update for providing a way to synchronize des…
Browse files Browse the repository at this point in the history
…ynced data and meta channels
  • Loading branch information
M. Mert Yildiran committed Nov 14, 2021
1 parent 9979dbb commit 22d4d27
Showing 1 changed file with 25 additions and 5 deletions.
30 changes: 25 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,15 +807,35 @@ func streamRecords(conn net.Conn, data []byte) (err error) {
rlimit = 0
}

if len(metadata) > 0 {
_, err = conn.Write([]byte(fmt.Sprintf("%s %s\n", CMD_METADATA, string(metadata))))
if err != nil {
log.Printf("Write error: %v\n", err)
// Meta periodic update provides a way to synchronize desynced data and meta channels
// on clients in case of calm streams. Such that it sends a metadata update to
// the client for each 0.5 seconds until a partition is modified. Means that
// it continues until a new data is inserted into the database.
stopMetaPeriodicUpdate := false
go func() {
for {
if stopMetaPeriodicUpdate {
break
}
if len(metadata) > 0 {
_, err = conn.Write([]byte(fmt.Sprintf("%s %s\n", CMD_METADATA, string(metadata))))
if err != nil {
log.Printf("Write error: %v\n", err)
}

if stopMetaPeriodicUpdate {
break
}
}
time.Sleep(500 * time.Millisecond)
}
}
}()

// Block until a partition is modified
watchPartitions()

// Stop the meta periodic update
stopMetaPeriodicUpdate = true
}
}

Expand Down

0 comments on commit 22d4d27

Please sign in to comment.