Skip to content

Commit

Permalink
Don't wait in Fetch if the query is finished but limit is not reached
Browse files Browse the repository at this point in the history
  • Loading branch information
M. Mert Yildiran committed May 9, 2022
1 parent c66c4e1 commit f27eab0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
30 changes: 19 additions & 11 deletions client/go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c *Connection) Query(leftOff string, query string, data chan []byte, meta
query = escapeLineFeed(query)

var wg sync.WaitGroup
go readConnection(&wg, c, data, meta)
go readConnection(&wg, c, data, meta, false, nil)
wg.Add(1)

err = c.SendText(CMD_QUERY)
Expand Down Expand Up @@ -133,7 +133,7 @@ func Single(host string, port string, id string, query string) (data []byte, err
ret := make(chan []byte)

var wg sync.WaitGroup
go readConnection(&wg, c, ret, nil)
go readConnection(&wg, c, ret, nil, false, nil)
wg.Add(1)

err = c.SendText(CMD_SINGLE)
Expand Down Expand Up @@ -172,9 +172,10 @@ func Fetch(host string, port string, leftOff string, direction int, query string

dataChan := make(chan []byte)
metaChan := make(chan []byte)
closeChan := make(chan bool)

var wg sync.WaitGroup
go readConnection(&wg, c, dataChan, metaChan)
go readConnection(&wg, c, dataChan, metaChan, true, closeChan)
wg.Add(1)

err = c.SendText(CMD_FETCH)
Expand Down Expand Up @@ -233,6 +234,9 @@ func Fetch(host string, port string, leftOff string, direction int, query string
c.Close()
return
}
case <-closeChan:
c.Close()
return
case <-afterCh:
c.Close()
return
Expand All @@ -254,7 +258,7 @@ func Validate(host string, port string, query string) (err error) {
ret := make(chan []byte)

var wg sync.WaitGroup
go readConnection(&wg, c, ret, nil)
go readConnection(&wg, c, ret, nil, false, nil)
wg.Add(1)

err = c.SendText(CMD_VALIDATE)
Expand Down Expand Up @@ -290,7 +294,7 @@ func Macro(host string, port string, macro string, expanded string) (err error)
ret := make(chan []byte)

var wg sync.WaitGroup
go readConnection(&wg, c, ret, nil)
go readConnection(&wg, c, ret, nil, false, nil)
wg.Add(1)

err = c.SendText(CMD_MACRO)
Expand Down Expand Up @@ -328,7 +332,7 @@ func InsertionFilter(host string, port string, query string) (err error) {
ret := make(chan []byte)

var wg sync.WaitGroup
go readConnection(&wg, c, ret, nil)
go readConnection(&wg, c, ret, nil, false, nil)
wg.Add(1)

err = c.SendText(CMD_INSERTION_FILTER)
Expand Down Expand Up @@ -365,7 +369,7 @@ func Limit(host string, port string, limit int64) (err error) {
ret := make(chan []byte)

var wg sync.WaitGroup
go readConnection(&wg, c, ret, nil)
go readConnection(&wg, c, ret, nil, false, nil)
wg.Add(1)

err = c.SendText(CMD_LIMIT)
Expand Down Expand Up @@ -400,7 +404,7 @@ func Flush(host string, port string) (err error) {
ret := make(chan []byte)

var wg sync.WaitGroup
go readConnection(&wg, c, ret, nil)
go readConnection(&wg, c, ret, nil, false, nil)
wg.Add(1)

err = c.SendText(CMD_FLUSH)
Expand Down Expand Up @@ -430,7 +434,7 @@ func Reset(host string, port string) (err error) {
ret := make(chan []byte)

var wg sync.WaitGroup
go readConnection(&wg, c, ret, nil)
go readConnection(&wg, c, ret, nil, false, nil)
wg.Add(1)

err = c.SendText(CMD_RESET)
Expand All @@ -450,7 +454,7 @@ func Reset(host string, port string) (err error) {

// readConnection is a Goroutine that recieves messages from the TCP connection
// and sends them to a []byte channel provided by the data parameter.
func readConnection(wg *sync.WaitGroup, c *Connection, data chan []byte, meta chan []byte) {
func readConnection(wg *sync.WaitGroup, c *Connection, data chan []byte, meta chan []byte, fetching bool, close chan bool) {
defer wg.Done()
for {
scanner := bufio.NewScanner(c)
Expand All @@ -477,7 +481,11 @@ func readConnection(wg *sync.WaitGroup, c *Connection, data chan []byte, meta ch
meta <- b
continue
case CloseConnection:
log.Printf("%s client: Server is leaving. Hanging up.\n", SoftwareName)
if fetching {
close <- true
} else {
log.Printf("%s client: Server is leaving. Hanging up.\n", SoftwareName)
}
break
}

Expand Down
2 changes: 2 additions & 0 deletions server/lib/storages/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,8 @@ func (storage *nativeStorage) Fetch(conn net.Conn, leftOff string, direction str
numberOfWritten++
}
}

basenine.SendClose(conn)
return
}

Expand Down

0 comments on commit f27eab0

Please sign in to comment.