diff --git a/server/server.go b/server/server.go index 57c644c..65d831a 100644 --- a/server/server.go +++ b/server/server.go @@ -807,15 +807,35 @@ func streamRecords(conn net.Conn, data []byte) (err error) { rlimit = 0 } - if len(metadata) > 0 { - _, err = conn.Write([]byte(fmt.Sprintf("%s %s\n", CMD_METADATA, string(metadata)))) - if err != nil { - log.Printf("Write error: %v\n", err) + // Meta periodic update provides a way to synchronize desynced data and meta channels + // on clients in case of calm streams. Such that it sends a metadata update to + // the client for each 0.5 seconds until a partition is modified. Means that + // it continues until a new data is inserted into the database. + stopMetaPeriodicUpdate := false + go func() { + for { + if stopMetaPeriodicUpdate { + break + } + if len(metadata) > 0 { + _, err = conn.Write([]byte(fmt.Sprintf("%s %s\n", CMD_METADATA, string(metadata)))) + if err != nil { + log.Printf("Write error: %v\n", err) + } + + if stopMetaPeriodicUpdate { + break + } + } + time.Sleep(500 * time.Millisecond) } - } + }() // Block until a partition is modified watchPartitions() + + // Stop the meta periodic update + stopMetaPeriodicUpdate = true } }