-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add Talaria sink, include build in Dockerfile * Change email * Revert Dockerfile * Update readme for talaria writer * Function for getting client * Update config in writer * Test on blocking client * Update config for TalariaSink * Set defaults for all except endpoint * Remove log * Update config * check for unavailable, redial and retry * Try and close existing connection before creating new client * Refactor * Update README * Change values to default
- Loading branch information
Showing
6 changed files
with
173 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# Talaria | ||
|
||
This sink implements sending data to a second Talaria. It can be used when there is one Talaria for **event ingestion**, and needs to write data to a second Talaria, which is used **purely for querying data.** | ||
|
||
This sink can be enabled by adding the following configuration in the `storage` section: | ||
|
||
```yaml | ||
storage: | ||
compact: # enable compaction | ||
interval: 60 # compact every 60 seconds | ||
nameFunc: "s3://bucket/namefunc.lua" # file name function | ||
talaria: # sink to Talaria | ||
endpoint: "127.0.0.1:8043" # Talaria endpoint to write data to | ||
timeout: 5 # Timeout for requests to Talaria | ||
maxConcurrent: 10 # Number of concurrent requests to Talaria | ||
errorThreshold: 50 # Percentage of errors before no more requests are sent | ||
... | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package talaria | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
talaria "github.com/kelindar/talaria/client/golang" | ||
"github.com/kelindar/talaria/internal/encoding/key" | ||
"github.com/kelindar/talaria/internal/monitor/errors" | ||
"github.com/myteksi/hystrix-go/hystrix" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
// getClient will create a Talaria client | ||
func getClient(endpoint string, options ...talaria.Option) (*talaria.Client, error) { | ||
client, err := talaria.Dial(endpoint, options...) | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
return client, nil | ||
} | ||
|
||
// Writer to write to TalariaDB | ||
type Writer struct { | ||
lock sync.Mutex | ||
endpoint string | ||
client *talaria.Client | ||
options []talaria.Option | ||
} | ||
|
||
// New initializes a new Talaria writer. | ||
func New(endpoint string, circuitTimeout *time.Duration, maxConcurrent *int, errorPercentThreshold *int) (*Writer, error) { | ||
|
||
var newTimeout = 5 * time.Second | ||
var newMaxConcurrent = hystrix.DefaultMaxConcurrent | ||
var newErrorPercentThreshold = hystrix.DefaultErrorPercentThreshold | ||
|
||
// Set defaults for variables if there aren't any | ||
if circuitTimeout != nil { | ||
newTimeout = *circuitTimeout * time.Second | ||
} | ||
|
||
if maxConcurrent != nil { | ||
newMaxConcurrent = *maxConcurrent | ||
} | ||
|
||
if errorPercentThreshold != nil { | ||
newErrorPercentThreshold = *errorPercentThreshold | ||
} | ||
|
||
dialOptions := []talaria.Option{} | ||
dialOptions = append(dialOptions, talaria.WithCircuit(newTimeout, newMaxConcurrent, newErrorPercentThreshold)) | ||
|
||
client, err := getClient(endpoint, dialOptions...) | ||
|
||
// Return writer with nil client | ||
if err != nil { | ||
return nil, errors.Internal("talaria: unable to create a client", err) | ||
} | ||
|
||
return &Writer{ | ||
client: client, | ||
endpoint: endpoint, | ||
options: dialOptions, | ||
}, nil | ||
} | ||
|
||
// Write will write the ORC data to Talaria | ||
func (w *Writer) Write(key key.Key, val []byte) error { | ||
|
||
// Check if client is nil | ||
if w.client == nil { | ||
if err := w.tryConnect(); err != nil { | ||
return errors.Internal("talaria: client is nil, unable to connect", err) | ||
} | ||
} | ||
|
||
// Check error status if it needs to redial | ||
if err := w.client.IngestORC(context.Background(), val); err != nil { | ||
errStatus, _ := status.FromError(err) | ||
if codes.Unavailable == errStatus.Code() { | ||
// Server unavailable, redial | ||
if err := w.tryConnect(); err != nil { | ||
return errors.Internal("talaria: unable to redial", err) | ||
} | ||
// Send again after redial | ||
if err := w.client.IngestORC(context.Background(), val); err != nil { | ||
return errors.Internal("talaria: unable to write after redial", err) | ||
} | ||
} | ||
return errors.Internal("talaria: unable to write", err) | ||
} | ||
return nil | ||
} | ||
|
||
// tryConnect will reconnect to Talaria if needed | ||
func (w *Writer) tryConnect() error { | ||
w.lock.Lock() | ||
defer w.lock.Unlock() | ||
|
||
client, err := getClient(w.endpoint, w.options...) | ||
if err != nil { | ||
return err | ||
} | ||
w.client = client | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package talaria | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestTalariaWriter(t *testing.T) { | ||
var timeout time.Duration = 5 | ||
var concurrency int = 10 | ||
var errorPercentage int = 50 | ||
c, err := New("www.talaria.net:8043", &timeout, &concurrency, &errorPercentage) | ||
|
||
// TODO: Impove test | ||
assert.Nil(t, c) | ||
assert.Error(t, err) | ||
|
||
assert.Panics(t, func() { | ||
c.Write([]byte("abc"), []byte("hello")) | ||
}) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters