Skip to content

Commit

Permalink
Fix substreaming to per table level (#51)
Browse files Browse the repository at this point in the history
* Fix streaming per table

* Update documentation for compaction and streaming

* Readme for pubsub

* Change Table interface, configure empty streams for nodes nad log

* Refactor so timeseries can be casted as storage streamer

Co-authored-by: tiewkeehui <[email protected]>
  • Loading branch information
TiewKH and tiewkeehui authored Nov 19, 2020
1 parent ed2b058 commit 5dc6756
Show file tree
Hide file tree
Showing 21 changed files with 173 additions and 127 deletions.
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,22 @@ If your organisation needs a reliable and scalable data ingestion platform, you

![alt text](.github/images/ingest.png)

In order to setup Talaria as an ingestion platform, you will need to enable `compaction` in the configuration, something along these lines:
In order to setup Talaria as an ingestion platform, you will need specify a table, in this case "eventlog", and enable `compaction` in the configuration, something along these lines:

```yaml
mode: staging
env: staging
domain: "talaria-headless.default.svc.cluster.local"
storage:
dir: "/data"
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
s3: # sink to Amazon S3
region: "ap-southeast-1"
bucket: "bucket"
tables:
eventlog:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
s3: # sink to Amazon S3
region: "ap-southeast-1"
bucket: "bucket"
...
```

Expand All @@ -79,6 +81,7 @@ Below is a list of currently supported sinks and their example configurations:
- [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/) using [azure sink](./internal/storage/writer/azure).
- [Minio](https://min.io/) using [s3 sink](./internal/storage/writer/s3), a custom endpoint and us-east-1 region.
- [Google Big Query](https://cloud.google.com/bigquery/) using [bigquery sink](./internal/storage/writer/bigquery).
- Talaria itself using [talaria sink](./internal/storage/writer/talaria).

## Hot Data Query with Talaria

Expand Down Expand Up @@ -107,9 +110,8 @@ readers:
storage:
dir: "/data"
tables:
timeseries:
name: eventlog
ttl: 3600
eventlog:
ttl: 3600 # data is persisted for 1 hour
hashBy: event
sortBy: time
...
Expand Down
4 changes: 2 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type Config struct {
Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"`
Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"`
K8s *K8s `json:"k8s,omitempty" yaml:"k8s" env:"K8S"`
Streams Streams `json:"streams" yaml:"streams" env:"STREAMS"`
}

type K8s struct {
Expand All @@ -54,7 +53,8 @@ type Table struct {
HashBy string `json:"hashBy,omitempty" yaml:"hashBy" env:"HASHBY"` // The column to use as key (metric), defaults to 'event'.
SortBy string `json:"sortBy,omitempty" yaml:"sortBy" env:"SORTBY"` // The column to use as time, defaults to 'tsi'.
Schema string `json:"schema" yaml:"schema" env:"SCHEMA"` // The schema of the table
Compact *Compaction `json:"compact" yaml:"compact" env:"COMPACT"`
Compact *Compaction `json:"compact" yaml:"compact" env:"COMPACT"` // The compaction configuration for the table
Streams Streams `json:"streams" yaml:"streams" env:"STREAMS"` // The streams to stream data to for data in this table
}

// Storage is the location to write the data
Expand Down
24 changes: 12 additions & 12 deletions internal/config/env/configurer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ tables:
interval: 300
file:
dir: "output/"
streams:
- pubsub:
project: my-gcp-project
topic: my-topic
filter: "gcs://my-bucket/my-function.lua"
encoder: json
- pubsub:
project: my-gcp-project
topic: my-topic2
encoder: json
statsd:
host: "127.0.0.1"
port: 8126
Expand All @@ -89,27 +99,17 @@ computed:
function main(input)
return json.encode(input)
end
streams:
- pubsub:
project: my-gcp-project
topic: my-topic
filter: "gcs://my-bucket/my-function.lua"
encoder: json
- pubsub:
project: my-gcp-project
topic: my-topic2
encoder: json
`)

// populate the config with the env variable
e := New("TALARIA")
assert.NoError(t, e.Configure(c))

// asserts
assert.Len(t, c.Streams, 2)
assert.Len(t, c.Tables["eventlog"].Streams, 2)
assert.Len(t, c.Computed, 1)
assert.Len(t, c.Tables, 1)

assert.Equal(t, "my-gcp-project", c.Streams[0].PubSub.Project)
assert.Equal(t, "my-gcp-project", c.Tables["eventlog"].Streams[0].PubSub.Project)
assert.Equal(t, "output/", c.Tables["eventlog"].Compact.File.Directory)
}
22 changes: 11 additions & 11 deletions internal/config/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ tables:
project: "airasia-opdatalake-stg"
dataset: "eventlog"
table: "events"
streams:
- pubsub:
project: my-gcp-project
topic: my-topic
filter: "gcs://my-bucket/my-function.lua"
encoder: json
- pubsub:
project: my-gcp-project
topic: my-topic2
encoder: json
users:
hashBy: "user_id"
sortBy: "ingested_at"
schema: "gcs://k8s-default-stg-configs/ingestor/schema2.yaml"
streams:
- pubsub:
project: my-gcp-project
topic: my-topic
filter: "gcs://my-bucket/my-function.lua"
encoder: json
- pubsub:
project: my-gcp-project
topic: my-topic2
encoder: json
schema: "gcs://k8s-default-stg-configs/ingestor/schema2.yaml"
2 changes: 1 addition & 1 deletion internal/encoding/block/from_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func FromBatchBy(batch *talaria.Batch, partitionBy string, filter *typeof.Schema
row.Set(columnName, columnValue)
}

// Append computed columns and fill nulls for the row
// Append computed columns
// Error can only be from encoding the row, error is logged in Publish() so we can ignore the error here and continue to convert row to columns
out, _ := apply(row)

Expand Down
2 changes: 1 addition & 1 deletion internal/encoding/block/from_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package block

import (
"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/binary"
"github.com/kelindar/binary/nocopy"
"github.com/kelindar/talaria/internal/column"
)

// FromBuffer unmarshals a block from a in-memory buffer.
Expand Down
5 changes: 1 addition & 4 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/kelindar/talaria/internal/presto"
script "github.com/kelindar/talaria/internal/scripting"
"github.com/kelindar/talaria/internal/server/thriftlog"
"github.com/kelindar/talaria/internal/storage"
"github.com/kelindar/talaria/internal/table"
talaria "github.com/kelindar/talaria/proto"
"google.golang.org/grpc"
Expand All @@ -47,13 +46,12 @@ type Storage interface {
// ------------------------------------------------------------------------------------------------------------

// New creates a new talaria server.
func New(conf config.Func, monitor monitor.Monitor, loader *script.Loader, streams storage.Streamer, tables ...table.Table) *Server {
func New(conf config.Func, monitor monitor.Monitor, loader *script.Loader, tables ...table.Table) *Server {
const maxMessageSize = 32 * 1024 * 1024 // 32 MB
server := &Server{
server: grpc.NewServer(grpc.MaxRecvMsgSize(maxMessageSize)),
conf: conf,
monitor: monitor,
streams: streams,
tables: make(map[string]table.Table),
}

Expand Down Expand Up @@ -90,7 +88,6 @@ type Server struct {
tables map[string]table.Table // The list of tables
computed []column.Computed // The set of computed columns
s3sqs *s3sqs.Ingress // The S3SQS Ingress (optional)
streams storage.Streamer // The streams to stream data to
}

// Listen starts listening on presto RPC & gRPC.
Expand Down
14 changes: 13 additions & 1 deletion internal/server/server_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ import (
"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/monitor/errors"
"github.com/kelindar/talaria/internal/storage"
"github.com/kelindar/talaria/internal/storage/stream"
"github.com/kelindar/talaria/internal/table"
talaria "github.com/kelindar/talaria/proto"
)

// applyFunc applies a transformation on a row and returns a new row
type applyFunc = func(block.Row) (block.Row, error)

const ingestErrorKey = "ingest.error"

// Ingest implements ingress.IngressServer
Expand All @@ -34,8 +38,16 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
filter = &schema
}

// Functions to be applied
funcs := []applyFunc{block.Transform(filter, s.computed...)}

// If table supports streaming, add publishing function
if streamer, ok := t.(storage.Streamer); ok {
funcs = append(funcs, stream.Publish(streamer, s.monitor))
}

// Partition the request for the table
blocks, err := block.FromRequestBy(request, appender.HashBy(), filter, block.Transform(filter, s.computed...), stream.Publish(s.streams, s.monitor))
blocks, err := block.FromRequestBy(request, appender.HashBy(), filter, funcs...)
if err != nil {
s.monitor.Count1(ctxTag, ingestErrorKey, "type:convert")
return nil, errors.Internal("unable to read the block", err)
Expand Down
17 changes: 9 additions & 8 deletions internal/storage/writer/azure/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# Azure Blob Storage

This sink implements Azure Blob Storage protocol. It can can be enabled by adding the following configuration in the `storage` section:
This sink implements Azure Blob Storage protocol. It can can be enabled by adding the following configuration in the `tables` section:

```yaml
storage:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
azure: # sink to use
container: "container-id-1" # the container ID
prefix: "" # (optional) prefix to add
tables:
eventlog:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
azure: # sink to use
container: "container-id-1" # the container ID
prefix: "" # (optional) prefix to add
...
```
19 changes: 10 additions & 9 deletions internal/storage/writer/bigquery/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# Google Big Query

This sink implements Google Big Query protocol. It can can be enabled by adding the following configuration in the `storage` section:
This sink implements Google Big Query protocol. It can can be enabled by adding the following configuration in the `tables` section:

```yaml
storage:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
bigquery: # sink to use
project: "project-id-1" # project ID
dataset: "mydataset" # big query dataset ID
table: "mytable" # big query table ID
tables:
eventlog:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
bigquery: # sink to use
project: "project-id-1" # project ID
dataset: "mydataset" # big query dataset ID
table: "mytable" # big query table ID
...
```
15 changes: 8 additions & 7 deletions internal/storage/writer/file/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Google Cloud Storage

This sink supports writing to the local file system. It can can be enabled by adding the following configuration in the `storage` section:
This sink supports writing to the local file system. It can can be enabled by adding the following configuration in the `tables` section:

```yaml
storage:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
file: # sink to use
dir: "/output" # the output directory
tables:
eventlog:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
file: # sink to use
dir: "/output" # the output directory
...
```
17 changes: 9 additions & 8 deletions internal/storage/writer/gcs/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# Google Cloud Storage

This sink implements Google Cloud Storage protocol. It can can be enabled by adding the following configuration in the `storage` section:
This sink implements Google Cloud Storage protocol. It can can be enabled by adding the following configuration in the `tables` section:

```yaml
storage:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
gcs: # sink to use
bucket: "bucket" # the bucket to use
prefix: "dir1/" # (optional) prefix to add
tables:
eventlog:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
gcs: # sink to use
bucket: "bucket" # the bucket to use
prefix: "dir1/" # (optional) prefix to add
...
```
15 changes: 15 additions & 0 deletions internal/storage/writer/pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Google Pub/Sub sink

This sink implements Google Pub/Sub protocol. It only works for streams, not for compaction like the other sinks. It can can be enabled by adding the following configuration in the `tables` section:

```yaml
tables:
eventlog:
streams:
- pubsub:
project: my-gcp-project
topic: my-topic
filter: "gcs://my-bucket/my-function.lua"
encoder: json
...
```
29 changes: 15 additions & 14 deletions internal/storage/writer/s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@

This sink implements Amazon S3 protocol and can be enabled for Digital Ocean Spaces and Minio using a custom `endpoint`.

This sink can be enabled by adding the following configuration in the `storage` section:
This sink can be enabled by adding the following configuration in the `tables` section:

```yaml
storage:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
s3: # sink to Amazon S3
region: "ap-southeast-1" # the region to use
bucket: "bucket" # the bucket to use
prefix: "dir1/" # (optional) prefix to add
endpoint: "http://127.0.0.1" # (optional) custom endpoint to use
sse: "" # (optional) server-side encryption
accessKey: "" # (optional) static access key to override
secretKey: "" # (optional) static secret key to override
concurrency: 32 # (optional) upload concurrency, default=NUM_CPU
tables:
eventlog:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
s3: # sink to Amazon S3
region: "ap-southeast-1" # the region to use
bucket: "bucket" # the bucket to use
prefix: "dir1/" # (optional) prefix to add
endpoint: "http://127.0.0.1" # (optional) custom endpoint to use
sse: "" # (optional) server-side encryption
accessKey: "" # (optional) static access key to override
secretKey: "" # (optional) static secret key to override
concurrency: 32 # (optional) upload concurrency, default=NUM_CPU
...
```
Loading

0 comments on commit 5dc6756

Please sign in to comment.