From 1d356dc0f0d05adc4a47a0a73f06891d794be678 Mon Sep 17 00:00:00 2001 From: TiewKH Date: Thu, 15 Oct 2020 09:25:21 +0800 Subject: [PATCH] Added TalariaSink (#40) * Add Talaria sink, include build in Dockerfile * Change email * Revert Dockerfile * Update readme for talaria writer * Function for getting client * Update config in writer * Test on blocking client * Update config for TalariaSink * Set defaults for all except endpoint * Remove log * Update config * check for unavailable, redial and retry * Try and close existing connection before creating new client * Refactor * Update README * Change values to default --- internal/config/config.go | 9 ++ internal/storage/writer/talaria/README.md | 18 +++ internal/storage/writer/talaria/talaria.go | 110 ++++++++++++++++++ .../storage/writer/talaria/talaria_test.go | 24 ++++ internal/storage/writer/writer.go | 12 +- internal/storage/writer/writer_test.go | 2 +- 6 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 internal/storage/writer/talaria/README.md create mode 100644 internal/storage/writer/talaria/talaria.go create mode 100644 internal/storage/writer/talaria/talaria_test.go diff --git a/internal/config/config.go b/internal/config/config.go index 03adb328..326e6b4d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -137,6 +137,7 @@ type Compaction struct { BigQuery *BigQuerySink `json:"bigquery" yaml:"bigquery" env:"BIGQUERY"` // The Big Query writer configuration GCS *GCSSink `json:"gcs" yaml:"gcs" env:"GCS"` // The Google Cloud Storage writer configuration File *FileSink `json:"file" yaml:"file" env:"FILE"` // The local file system writer configuration + Talaria *TalariaSink `json:"talaria" yaml:"talaria" env:"TALARIA"` // The Talaria writer configuration } // S3Sink represents a sink for AWS S3 and compatible stores. @@ -175,6 +176,14 @@ type FileSink struct { Directory string `json:"dir" yaml:"dir" env:"DIR"` } +// TalariaSink represents a sink to an instance of Talaria +type TalariaSink struct { + Endpoint string `json:"endpoint" yaml:"endpoint" env:"ENDPOINT"` // The second Talaria endpoint + CircuitTimeout *time.Duration `json:"timeout" yaml:"timeout" env:"TIMEOUT"` // The timeout (in seconds) for requests to the second Talaria + MaxConcurrent *int `json:"concurrency" yaml:"concurrency" env:"CONCURRENCY"` // The number of concurrent requests permissible + ErrorPercentThreshold *int `json:"errorThreshold" yaml:"errorThreshold" env:"ERROR_THRESHOLD"` // The percentage of failed requests tolerated +} + // Func represents a config function type Func func() *Config diff --git a/internal/storage/writer/talaria/README.md b/internal/storage/writer/talaria/README.md new file mode 100644 index 00000000..64f19c9f --- /dev/null +++ b/internal/storage/writer/talaria/README.md @@ -0,0 +1,18 @@ +# Talaria + +This sink implements sending data to a second Talaria. It can be used when there is one Talaria for **event ingestion**, and needs to write data to a second Talaria, which is used **purely for querying data.** + +This sink can be enabled by adding the following configuration in the `storage` section: + +```yaml +storage: + compact: # enable compaction + interval: 60 # compact every 60 seconds + nameFunc: "s3://bucket/namefunc.lua" # file name function + talaria: # sink to Talaria + endpoint: "127.0.0.1:8043" # Talaria endpoint to write data to + timeout: 5 # Timeout for requests to Talaria + maxConcurrent: 10 # Number of concurrent requests to Talaria + errorThreshold: 50 # Percentage of errors before no more requests are sent +... +``` diff --git a/internal/storage/writer/talaria/talaria.go b/internal/storage/writer/talaria/talaria.go new file mode 100644 index 00000000..2a30344f --- /dev/null +++ b/internal/storage/writer/talaria/talaria.go @@ -0,0 +1,110 @@ +package talaria + +import ( + "context" + "sync" + "time" + + talaria "github.com/kelindar/talaria/client/golang" + "github.com/kelindar/talaria/internal/encoding/key" + "github.com/kelindar/talaria/internal/monitor/errors" + "github.com/myteksi/hystrix-go/hystrix" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// getClient will create a Talaria client +func getClient(endpoint string, options ...talaria.Option) (*talaria.Client, error) { + client, err := talaria.Dial(endpoint, options...) + + if err != nil { + return nil, err + } + return client, nil +} + +// Writer to write to TalariaDB +type Writer struct { + lock sync.Mutex + endpoint string + client *talaria.Client + options []talaria.Option +} + +// New initializes a new Talaria writer. +func New(endpoint string, circuitTimeout *time.Duration, maxConcurrent *int, errorPercentThreshold *int) (*Writer, error) { + + var newTimeout = 5 * time.Second + var newMaxConcurrent = hystrix.DefaultMaxConcurrent + var newErrorPercentThreshold = hystrix.DefaultErrorPercentThreshold + + // Set defaults for variables if there aren't any + if circuitTimeout != nil { + newTimeout = *circuitTimeout * time.Second + } + + if maxConcurrent != nil { + newMaxConcurrent = *maxConcurrent + } + + if errorPercentThreshold != nil { + newErrorPercentThreshold = *errorPercentThreshold + } + + dialOptions := []talaria.Option{} + dialOptions = append(dialOptions, talaria.WithCircuit(newTimeout, newMaxConcurrent, newErrorPercentThreshold)) + + client, err := getClient(endpoint, dialOptions...) + + // Return writer with nil client + if err != nil { + return nil, errors.Internal("talaria: unable to create a client", err) + } + + return &Writer{ + client: client, + endpoint: endpoint, + options: dialOptions, + }, nil +} + +// Write will write the ORC data to Talaria +func (w *Writer) Write(key key.Key, val []byte) error { + + // Check if client is nil + if w.client == nil { + if err := w.tryConnect(); err != nil { + return errors.Internal("talaria: client is nil, unable to connect", err) + } + } + + // Check error status if it needs to redial + if err := w.client.IngestORC(context.Background(), val); err != nil { + errStatus, _ := status.FromError(err) + if codes.Unavailable == errStatus.Code() { + // Server unavailable, redial + if err := w.tryConnect(); err != nil { + return errors.Internal("talaria: unable to redial", err) + } + // Send again after redial + if err := w.client.IngestORC(context.Background(), val); err != nil { + return errors.Internal("talaria: unable to write after redial", err) + } + } + return errors.Internal("talaria: unable to write", err) + } + return nil +} + +// tryConnect will reconnect to Talaria if needed +func (w *Writer) tryConnect() error { + w.lock.Lock() + defer w.lock.Unlock() + + client, err := getClient(w.endpoint, w.options...) + if err != nil { + return err + } + w.client = client + return nil +} diff --git a/internal/storage/writer/talaria/talaria_test.go b/internal/storage/writer/talaria/talaria_test.go new file mode 100644 index 00000000..8cb46181 --- /dev/null +++ b/internal/storage/writer/talaria/talaria_test.go @@ -0,0 +1,24 @@ +package talaria + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTalariaWriter(t *testing.T) { + var timeout time.Duration = 5 + var concurrency int = 10 + var errorPercentage int = 50 + c, err := New("www.talaria.net:8043", &timeout, &concurrency, &errorPercentage) + + // TODO: Impove test + assert.Nil(t, c) + assert.Error(t, err) + + assert.Panics(t, func() { + c.Write([]byte("abc"), []byte("hello")) + }) + +} diff --git a/internal/storage/writer/writer.go b/internal/storage/writer/writer.go index adb33bb7..8ab134e6 100644 --- a/internal/storage/writer/writer.go +++ b/internal/storage/writer/writer.go @@ -11,7 +11,7 @@ import ( "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" - "github.com/kelindar/talaria/internal/scripting" + script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/storage" "github.com/kelindar/talaria/internal/storage/compact" "github.com/kelindar/talaria/internal/storage/flush" @@ -22,6 +22,7 @@ import ( "github.com/kelindar/talaria/internal/storage/writer/multi" "github.com/kelindar/talaria/internal/storage/writer/noop" "github.com/kelindar/talaria/internal/storage/writer/s3" + "github.com/kelindar/talaria/internal/storage/writer/talaria" ) var seed = maphash.MakeSeed() @@ -109,6 +110,15 @@ func newWriter(config *config.Compaction) (flush.Writer, error) { writers = append(writers, w) } + // Configure Talaria writer if present + if config.Talaria != nil { + w, err := talaria.New(config.Talaria.Endpoint, config.Talaria.CircuitTimeout, config.Talaria.MaxConcurrent, config.Talaria.ErrorPercentThreshold) + if err != nil { + return nil, err + } + writers = append(writers, w) + } + // If no writers were configured, error out if len(writers) == 0 { return noop.New(), errors.New("compact: writer was not configured") diff --git a/internal/storage/writer/writer_test.go b/internal/storage/writer/writer_test.go index fccfa229..62ac13d1 100644 --- a/internal/storage/writer/writer_test.go +++ b/internal/storage/writer/writer_test.go @@ -7,7 +7,7 @@ import ( "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/logging" "github.com/kelindar/talaria/internal/monitor/statsd" - "github.com/kelindar/talaria/internal/scripting" + script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/storage/disk" "github.com/stretchr/testify/assert" )