From 683a18978509b932c468d978855c0720af93cea7 Mon Sep 17 00:00:00 2001 From: Erick Pintor Date: Fri, 26 Apr 2024 08:43:33 -0300 Subject: [PATCH] ENG-6207: Resume streams after a network error (#143) --- client.go | 15 +-------------- stream.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/client.go b/client.go index 39078bb..774c77f 100644 --- a/client.go +++ b/client.go @@ -322,20 +322,7 @@ func (c *Client) Stream(fql *Query, opts ...QueryOptFn) (*Events, error) { // Subscribe initiates a stream subscription for the given stream value. func (c *Client) Subscribe(stream Stream, opts ...StreamOptFn) (*Events, error) { - req := streamRequest{ - apiRequest: apiRequest{c.ctx, c.headers}, - Stream: stream, - } - - for _, streamOptionFn := range opts { - streamOptionFn(&req) - } - - if byteStream, err := req.do(c); err == nil { - return newEvents(byteStream), nil - } else { - return nil, err - } + return subscribe(c, stream, opts...) } // QueryIterator is a [fauna.Client] iterator for paginated queries diff --git a/stream.go b/stream.go index 42cdbfa..ecfaf0c 100644 --- a/stream.go +++ b/stream.go @@ -3,6 +3,7 @@ package fauna import ( "encoding/json" "io" + "net" ) // EventType represents a Fauna's event type. @@ -77,15 +78,43 @@ func (e *ErrEvent) Unmarshal(into any) error { // event is available or until the events iterator is closed via the // [fauna.Events.Close] method. type Events struct { - byteStream io.ReadCloser - decoder *json.Decoder + client *Client + stream Stream + byteStream io.ReadCloser + decoder *json.Decoder + lastTxnTime int64 } -func newEvents(byteStream io.ReadCloser) *Events { - return &Events{ - byteStream: byteStream, - decoder: json.NewDecoder(byteStream), +func subscribe(client *Client, stream Stream, opts ...StreamOptFn) (*Events, error) { + events := &Events{client: client, stream: stream} + if err := events.reconnect(opts...); err != nil { + return nil, err } + return events, nil +} + +func (es *Events) reconnect(opts ...StreamOptFn) error { + req := streamRequest{ + apiRequest: apiRequest{ + es.client.ctx, + es.client.headers, + }, + Stream: es.stream, + StartTS: es.lastTxnTime, + } + + for _, streamOptionFn := range opts { + streamOptionFn(&req) + } + + byteStream, err := req.do(es.client) + if err != nil { + return err + } + + es.byteStream = byteStream + es.decoder = json.NewDecoder(byteStream) + return nil } // Close gracefully closes the stream subscription. @@ -113,14 +142,30 @@ type rawEvent = struct { func (es *Events) Next() (event *Event, err error) { raw := rawEvent{} if err = es.decoder.Decode(&raw); err == nil { + es.syncTxnTime(raw.TxnTime) event, err = convertRawEvent(&raw) if _, ok := err.(*ErrEvent); ok { es.Close() // no more events are comming } + } else { + // NOTE: This code tries to resume streams on network and IO errors. It + // presume that if the service is unavailable, the reconnect call will + // fail. Automatic retries and backoff mechanisms are impleneted at the + // Client level. + if _, ok := err.(net.Error); ok || err == io.ErrUnexpectedEOF { + if err = es.reconnect(); err == nil { + event, err = es.Next() + } + } } return } +func (es *Events) syncTxnTime(txnTime int64) { + es.client.lastTxnTime.sync(txnTime) + es.lastTxnTime = txnTime +} + func convertRawEvent(raw *rawEvent) (event *Event, err error) { if raw.Error != nil { if raw.Error.Abort != nil {