From 444354ee19c6a31f07844eea52c058bef568139d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Thu, 17 Mar 2022 00:28:01 -0700 Subject: [PATCH] Add `FLUSH` and `RESET` connection modes (#28) * Add `FLUSH` and `RESET` connection modes * Fix the data race --- README.md | 25 ++++++++++ client/go/client.go | 53 ++++++++++++++++++++++ client/go/client_test.go | 10 ++++ server/server.go | 81 +++++++++++++++++++++++++++++---- server/server_test.go | 98 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 259 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 654f411..49256f2 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,11 @@ starting from a certain offset, supporting both directions. The disk usage ranges between `50000000` (50MB) and `100000000` (100MB). So the actual effective limit is the half of this value. +- **Flush mode** is a short lasting TCP connection mode that removes all the records in the database. + +- **Reset mode** is a short lasting TCP connection mode that removes all the records in the database +and resets the core into its initial state. + ### Query Querying achieved through a filter syntax named **Basenine Filter Language (BFL)**. It enables the user to query the traffic logs efficiently and precisely. @@ -208,3 +213,23 @@ if err != nil { // err can only be a connection error } ``` + +Flush: + +```go +// Remove all the records +err := Flush("localhost", "9099") +if err != nil { + // err can only be a connection error +} +``` + +Reset: + +```go +// Reset the database into its initial state +err := Reset("localhost", "9099") +if err != nil { + // err can only be a connection error +} +``` diff --git a/client/go/client.go b/client/go/client.go index a896122..b2ff612 100644 --- a/client/go/client.go +++ b/client/go/client.go @@ -38,6 +38,8 @@ const ( CMD_MACRO string = "/macro" CMD_LIMIT string = "/limit" CMD_METADATA string = "/metadata" + CMD_FLUSH string = "/flush" + CMD_RESET string = "/reset" ) // Closing indicators @@ -287,6 +289,57 @@ func Limit(host string, port string, limit int64) (err error) { return } +// Flush removes all the records in the database. +func Flush(host string, port string) (err error) { + var c *Connection + c, err = NewConnection(host, port) + if err != nil { + return + } + + ret := make(chan []byte) + + var wg sync.WaitGroup + go readConnection(&wg, c, ret, nil) + wg.Add(1) + + c.SendText(CMD_FLUSH) + + data := <-ret + text := string(data) + if text != "OK" { + err = errors.New(text) + } + c.Close() + return +} + +// Reset removes all the records in the database +// and resets the core into its initial state. +func Reset(host string, port string) (err error) { + var c *Connection + c, err = NewConnection(host, port) + if err != nil { + return + } + + ret := make(chan []byte) + + var wg sync.WaitGroup + go readConnection(&wg, c, ret, nil) + wg.Add(1) + + c.SendText(CMD_RESET) + + data := <-ret + text := string(data) + if text != "OK" { + err = errors.New(text) + } + c.Close() + return +} + // readConnection is a Goroutine that recieves messages from the TCP connection // and sends them to a []byte channel provided by the data parameter. func readConnection(wg *sync.WaitGroup, c *Connection, data chan []byte, meta chan []byte) { diff --git a/client/go/client_test.go b/client/go/client_test.go index 38edbbc..fcca1d1 100644 --- a/client/go/client_test.go +++ b/client/go/client_test.go @@ -157,6 +157,16 @@ func TestFetch(t *testing.T) { } } +func TestFlush(t *testing.T) { + err := Flush(HOST, PORT) + assert.Nil(t, err) +} + +func TestReset(t *testing.T) { + err := Reset(HOST, PORT) + assert.Nil(t, err) +} + func TestTCPConnectionLeak(t *testing.T) { for i := 0; i < 10000; i++ { err := Validate(HOST, PORT, `brand.name == "Chevrolet"`) diff --git a/server/server.go b/server/server.go index d5ec8a2..13d490f 100644 --- a/server/server.go +++ b/server/server.go @@ -68,11 +68,16 @@ type ConnectionMode int // // VALIDATE is a short lasting TCP connection mode for validating a query against syntax errors. // -// MACRO is short lasting TCP connection mode for setting a macro that will be expanded +// MACRO is a short lasting TCP connection mode for setting a macro that will be expanded // later on for each individual query. // -// LIMIT is short lasting TCP connection mode for setting the maximum database size +// LIMIT is a short lasting TCP connection mode for setting the maximum database size // to limit the disk usage. +// +// FLUSH is a short lasting TCP connection mode that removes all the records in the database. +// +// RESET is a short lasting TCP connection mode that removes all the records in the database +// and resets the core into its initial state. const ( NONE ConnectionMode = iota INSERT @@ -83,6 +88,8 @@ const ( VALIDATE MACRO LIMIT + FLUSH + RESET ) type Commands int @@ -98,6 +105,8 @@ const ( CMD_MACRO string = "/macro" CMD_LIMIT string = "/limit" CMD_METADATA string = "/metadata" + CMD_FLUSH string = "/flush" + CMD_RESET string = "/reset" ) // Constants defines the database filename's prefix and file extension. @@ -607,8 +616,9 @@ func handleConnection(conn net.Conn) { switch mode { case NONE: mode = _mode - // partitionIndex -1 means there are not partitions created yet - if mode == INSERT { + switch mode { + case INSERT: + // partitionIndex -1 means there are not partitions created yet // Safely access the current partition index cs.RLock() currentPartitionIndex := cs.partitionIndex @@ -616,6 +626,12 @@ func handleConnection(conn net.Conn) { if currentPartitionIndex == -1 { newPartition() } + case FLUSH: + flush() + sendOK(conn) + case RESET: + reset() + sendOK(conn) } case INSERT: insertData(data) @@ -643,6 +659,12 @@ func handleConnection(conn net.Conn) { applyMacro(conn, data) case LIMIT: setLimit(conn, data) + case FLUSH: + flush() + sendOK(conn) + case RESET: + reset() + sendOK(conn) } } @@ -696,6 +718,12 @@ func handleMessage(message string, conn net.Conn) (mode ConnectionMode, data []b case strings.HasPrefix(message, CMD_LIMIT): mode = LIMIT + case message == CMD_FLUSH: + mode = FLUSH + + case message == CMD_RESET: + mode = RESET + default: conn.Write([]byte("Unrecognized command.\n")) } @@ -792,7 +820,7 @@ func setInsertionFilter(conn net.Conn, data []byte) { cs.insertionFilter = query cs.insertionFilterExpr = insertionFilterExpr cs.Unlock() - conn.Write([]byte(fmt.Sprintf("OK\n"))) + sendOK(conn) } else { conn.Write([]byte(fmt.Sprintf("%s\n", err.Error()))) } @@ -1360,7 +1388,7 @@ func validateQuery(conn net.Conn, data []byte) { _, err = basenine.Parse(query) if err == nil { - conn.Write([]byte(fmt.Sprintf("OK\n"))) + sendOK(conn) } else { conn.Write([]byte(fmt.Sprintf("%s\n", err.Error()))) } @@ -1384,7 +1412,7 @@ func applyMacro(conn net.Conn, data []byte) { cs.macros = basenine.AddMacro(cs.macros, macro, expanded) cs.Unlock() - conn.Write([]byte(fmt.Sprintf("OK\n"))) + sendOK(conn) } // setLimit sets a limit for the maximum database size. @@ -1400,7 +1428,7 @@ func setLimit(conn net.Conn, data []byte) { cs.partitionSizeLimit = int64(value) / 2 cs.Unlock() - conn.Write([]byte(fmt.Sprintf("OK\n"))) + sendOK(conn) } func rlimitWrite(conn net.Conn, f *os.File, rlimit uint64, offsetQueue []int64, partitionRefQueue []int64, numberOfWritten uint64) (numberOfWrittenNew uint64, err error) { @@ -1462,3 +1490,40 @@ func rlimitWrite(conn net.Conn, f *os.File, rlimit uint64, offsetQueue []int64, numberOfWrittenNew = numberOfWritten return } + +// flush removes all the records in the database +func flush() { + cs.Lock() + cs = ConcurrentSliceV0{ + version: cs.version, + partitionIndex: -1, + macros: cs.macros, + insertionFilter: cs.insertionFilter, + insertionFilterExpr: cs.insertionFilterExpr, + } + cs.Lock() + removeDatabaseFiles() + dumpCore(true, true) + cs.Unlock() + newPartition() +} + +// reset removes all the records in the database and +// resets the core's state into its initial form +func reset() { + cs.Lock() + cs = ConcurrentSliceV0{ + version: VERSION, + partitionIndex: -1, + macros: make(map[string]string), + } + cs.Lock() + removeDatabaseFiles() + dumpCore(true, true) + cs.Unlock() + newPartition() +} + +func sendOK(conn net.Conn) { + conn.Write([]byte(fmt.Sprintf("OK\n"))) +} diff --git a/server/server_test.go b/server/server_test.go index 18634c4..022e071 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -696,6 +696,104 @@ func TestServerProtocolLimitMode(t *testing.T) { removeDatabaseFiles() } +func TestServerProtocolFlushMode(t *testing.T) { + server, client := net.Pipe() + go handleConnection(server) + + client.SetWriteDeadline(time.Now().Add(1 * time.Second)) + client.Write([]byte(fmt.Sprintf("%s\n", CMD_FLUSH))) + + client.Close() + server.Close() +} + +func TestServerProtocolResetMode(t *testing.T) { + server, client := net.Pipe() + go handleConnection(server) + + client.SetWriteDeadline(time.Now().Add(1 * time.Second)) + client.Write([]byte(fmt.Sprintf("%s\n", CMD_RESET))) + + client.Close() + server.Close() +} + +func TestServerFlush(t *testing.T) { + insertionFilter := "model" + insertionFilterExpr, _, err := prepareQuery(insertionFilter) + assert.Nil(t, err) + macros := map[string]string{"foo": "bar"} + payload := `{"brand":{"name":"Chevrolet"},"model":"Camaro","year":2021}` + + cs.Lock() + cs = ConcurrentSliceV0{ + version: VERSION, + partitionIndex: -1, + macros: macros, + insertionFilter: insertionFilter, + insertionFilterExpr: insertionFilterExpr, + } + + f := newPartition() + assert.NotNil(t, f) + + insertData([]byte(payload)) + + flush() + + assert.Equal(t, cs.version, VERSION) + assert.Empty(t, cs.lastOffset) + assert.Empty(t, cs.partitionRefs) + assert.Empty(t, cs.offsets) + assert.Len(t, cs.partitions, 1) + assert.Empty(t, cs.partitionIndex) + assert.Empty(t, cs.partitionSizeLimit) + assert.Empty(t, cs.truncatedTimestamp) + assert.Empty(t, cs.removedOffsetsCounter) + assert.Empty(t, cs.metaOffsetsLength) + assert.Equal(t, cs.macros, macros) + assert.Equal(t, cs.insertionFilter, insertionFilter) + assert.Equal(t, cs.insertionFilterExpr, insertionFilterExpr) +} + +func TestServerReset(t *testing.T) { + insertionFilter := "model" + insertionFilterExpr, _, err := prepareQuery(insertionFilter) + assert.Nil(t, err) + macros := map[string]string{"foo": "bar"} + payload := `{"brand":{"name":"Chevrolet"},"model":"Camaro","year":2021}` + + cs.Lock() + cs = ConcurrentSliceV0{ + version: VERSION, + partitionIndex: -1, + macros: macros, + insertionFilter: insertionFilter, + insertionFilterExpr: insertionFilterExpr, + } + + f := newPartition() + assert.NotNil(t, f) + + insertData([]byte(payload)) + + reset() + + assert.Equal(t, cs.version, VERSION) + assert.Empty(t, cs.lastOffset) + assert.Empty(t, cs.partitionRefs) + assert.Empty(t, cs.offsets) + assert.Len(t, cs.partitions, 1) + assert.Empty(t, cs.partitionIndex) + assert.Empty(t, cs.partitionSizeLimit) + assert.Empty(t, cs.truncatedTimestamp) + assert.Empty(t, cs.removedOffsetsCounter) + assert.Empty(t, cs.metaOffsetsLength) + assert.Empty(t, cs.macros) + assert.Empty(t, cs.insertionFilter) + assert.Empty(t, cs.insertionFilterExpr) +} + // handleCommands is used by readConnection to make the server's orders // in the client to take effect. Such that the server can hang up // the connection.