Skip to content

Commit

Permalink
Add NewMultiAccountWriter for azure sink (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaowei-g authored Apr 22, 2021
1 parent 32cc65c commit b84ed6d
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 56 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
cloud.google.com/go/pubsub v1.3.1
cloud.google.com/go/storage v1.7.0
github.com/Azure/azure-sdk-for-go v42.1.0+incompatible
github.com/Azure/go-autorest/autorest v0.10.1 // indirect
github.com/Azure/go-autorest/autorest/adal v0.8.3 // indirect
github.com/Azure/azure-storage-blob-go v0.13.0
github.com/Azure/go-autorest/autorest/azure/auth v0.5.7
github.com/Azure/go-autorest/autorest/to v0.3.0 // indirect
github.com/DataDog/datadog-go v3.7.1+incompatible
github.com/DataDog/zstd v1.4.5 // indirect
Expand Down Expand Up @@ -55,6 +55,5 @@ require (
google.golang.org/api v0.24.0
google.golang.org/genproto v0.0.0-20210325224202-eed09b1b5210 // indirect
google.golang.org/grpc v1.36.1
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 // indirect
gopkg.in/yaml.v2 v2.2.8
)
80 changes: 51 additions & 29 deletions go.sum

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,17 @@ type S3Sink struct {
Concurrency int `json:"concurrency" yaml:"concurrency" env:"CONCURRENCY"` // The S3 upload concurrency
}

// AzureSink reprents a sink to Azure
// AzureSink represents a sink to Azure
type AzureSink struct {
Container string `json:"container" yaml:"container" env:"CONTAINER"` // The container name
Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add
Container string `json:"container" yaml:"container" env:"CONTAINER"` // The container name
Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add
Parallelism uint16 `json:"parallelism" yaml:"parallelism" env:"PARALLELISM"` // The BlockBlob upload parallelism
BlockSize int64 `json:"blockSize" yaml:"blockSize" env:"BLOCKSIZE"` // The Block Size for upload
BlobServiceURL string `json:"blobServiceURL" yaml:"blobServiceURL" env:"BLOBSERVICEURL"` // The blob service URL
StorageAccounts []string `json:"storageAccounts" yaml:"storageAccounts" env:"STORAGEACCOUNTS"` // The list of storage accounts
}

// BigQuerySink reprents a sink to Google Big Query
// BigQuerySink represents a sink to Google Big Query
type BigQuerySink struct {
Project string `json:"project" yaml:"project" env:"PROJECT"` // The project ID
Dataset string `json:"dataset" yaml:"dataset" env:"DATASET"` // The dataset ID
Expand Down
20 changes: 19 additions & 1 deletion internal/storage/writer/azure/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Azure Blob Storage

This sink implements Azure Blob Storage protocol. It can can be enabled by adding the following configuration in the `tables` section:
This sink implements Azure Blob Storage protocol. It can be enabled by adding the following configuration in the `tables` section:

```yaml
tables:
Expand All @@ -13,3 +13,21 @@ tables:
prefix: "" # (optional) prefix to add
...
```
This sink can write to different Storage Accounts randomly each time, if multiple storage accounts are configured
```yaml
tables:
eventlog:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
azure: # sink to use
container: "container-id-1" # the container ID
prefix: "" # (optional) prefix to add
parallelism: 0 # (optional) default to 5
blockSize: 5242880 # (optional) if not set, use default
defaultBlobServiceURL: "" # (optional) if not set, use default
storageAccounts: # (optional) if not set, use single storage writer
- "storage-account-id-0"
- "storage-account-id-1"
...
```
136 changes: 136 additions & 0 deletions internal/storage/writer/azure/azure.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package azure

import (
"context"
"fmt"
"math/rand"
"net/url"
"os"
"path"
"time"

"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/errors"
)

Expand Down Expand Up @@ -63,3 +71,131 @@ func (w *Writer) Write(key key.Key, val []byte) error {
}
return nil
}

const (
ctxTag = "azure"
tokenRefreshBuffer = 2 * time.Minute
defaultBlobServiceURL = "https://%s.blob.core.windows.net"
defaultResourceID = "https://storage.azure.com/"
)

// MultiAccountWriter represents a writer for Microsoft Azure with multiple storage accounts.
type MultiAccountWriter struct {
monitor monitor.Monitor
blobServiceURL string
prefix string
containerURLs []azblob.ContainerURL
options azblob.UploadToBlockBlobOptions
}

// NewMultiAccountWriter creates a new MultiAccountWriter.
func NewMultiAccountWriter(monitor monitor.Monitor, blobServiceURL, container, prefix string, storageAccount []string, parallelism uint16, blockSize int64) (*MultiAccountWriter, error) {
if _, present := os.LookupEnv("AZURE_AD_RESOURCE"); !present {
if err := os.Setenv("AZURE_AD_RESOURCE", defaultResourceID); err != nil {
return nil, errors.New("azure: unable to set default AZURE_AD_RESOURCE environment variable")
}
}
if blobServiceURL == "" {
blobServiceURL = defaultBlobServiceURL
}

credential, err := GetAzureStorageCredentials(monitor)
if err != nil {
return nil, errors.Internal("azure: unable to get azure storage credential", err)
}

containerURLs := make([]azblob.ContainerURL, len(storageAccount))
for i, sa := range storageAccount {
azureStoragePipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{
Retry: azblob.RetryOptions{
MaxTries: 3,
},
})
u, _ := url.Parse(fmt.Sprintf(blobServiceURL, sa))
containerURLs[i] = azblob.NewServiceURL(*u, azureStoragePipeline).NewContainerURL(container)
}

return &MultiAccountWriter{
monitor: monitor,
prefix: prefix,
containerURLs: containerURLs,
options: azblob.UploadToBlockBlobOptions{
Parallelism: parallelism,
BlockSize: blockSize,
},
}, nil
}

func GetAzureStorageCredentials(monitor monitor.Monitor) (azblob.Credential, error) {
settings, err := auth.GetSettingsFromEnvironment()
if err != nil {
return nil, err
}

cc, err := settings.GetClientCredentials()
if err != nil {
return nil, err
}

spt, err := cc.ServicePrincipalToken()
if err != nil {
return nil, err
}

// Refresh the token once
if err := spt.Refresh(); err != nil {
return nil, err
}

// Token refresher function
var tokenRefresher azblob.TokenRefresher
tokenRefresher = func(credential azblob.TokenCredential) time.Duration {
monitor.Info("azure: refreshing azure storage auth token")

// Get a new token
if err := spt.Refresh(); err != nil {
monitor.Error(errors.Internal("azure: unable to refresh service principle token", err))
panic(err)
}
token := spt.Token()
credential.SetToken(token.AccessToken)

// Return the expiry time (x minutes before the token expires)
exp := token.Expires().Sub(time.Now().Add(tokenRefreshBuffer))
monitor.Info("azure: received new token, valid for %s", exp)
return exp
}

credential := azblob.NewTokenCredential("", tokenRefresher)
return credential, nil
}

// Write writes the data to a randomly selected storage account sink.
func (m *MultiAccountWriter) Write(key key.Key, val []byte) error {
start := time.Now()
ctx := context.Background()
containerURL, err := m.getContainerURL()
if err != nil {
return err
}

blobName := path.Join(m.prefix, string(key))
blockBlobURL := containerURL.NewBlockBlobURL(blobName)

_, err = azblob.UploadBufferToBlockBlob(ctx, val, blockBlobURL, m.options)
if err != nil {
m.monitor.Count1(ctxTag, "writeerror")
m.monitor.Info("failed_azure_write: %s", blobName)
return errors.Internal("azure: unable to write", err)
}
m.monitor.Histogram(ctxTag, "writelatency", float64(time.Since(start)))
return nil
}

func (m *MultiAccountWriter) getContainerURL() (*azblob.ContainerURL, error) {
if len(m.containerURLs) == 0 {
return nil, errors.New("azure: no containerURLs initialized")
}
i := rand.Intn(len(m.containerURLs))
return &m.containerURLs[i], nil
}
16 changes: 16 additions & 0 deletions internal/storage/writer/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package azure

import (
"os"
"strings"
"testing"

"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/logging"
"github.com/kelindar/talaria/internal/monitor/statsd"
"github.com/stretchr/testify/assert"
)

Expand All @@ -20,3 +24,15 @@ func TestWriter(t *testing.T) {
assert.NotNil(t, err)
})
}

func TestMultiAccountWriter(t *testing.T) {
os.Setenv("AZURE_TENANT_ID", "xyz")
os.Setenv("AZURE_CLIENT_ID", "xyz")
os.Setenv("AZURE_CLIENT_SECRET", "xyz")

c, err := NewMultiAccountWriter(monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"),
defaultBlobServiceURL, "container", "x", []string{"x-0"}, 0, 0)

assert.Nil(t, c)
assert.True(t, strings.Contains(err.Error(), "azure: unable to get azure storage credential"))
}
13 changes: 8 additions & 5 deletions internal/storage/writer/multi/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/grab/async"
"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/key"
"golang.org/x/sync/errgroup"
)

// SubWriter represents the sub-writer
Expand Down Expand Up @@ -42,13 +43,15 @@ func New(writers ...SubWriter) *Writer {

// Write writes the data to the sink.
func (w *Writer) Write(key key.Key, val []byte) error {
eg := new(errgroup.Group)
for _, w := range w.writers {
if err := w.Write(key, val); err != nil {
return err
}
w := w
eg.Go(func() error {
return w.Write(key, val)
})
}

return nil
// Wait blocks until all finished, and returns the first non-nil error (if any) from them
return eg.Wait()
}

// Run launches the asynchronous infinite loop for streamers to start streaming data
Expand Down
7 changes: 4 additions & 3 deletions internal/storage/writer/multi/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package multi

import (
"context"
"sync/atomic"
"testing"

"github.com/grab/async"
Expand Down Expand Up @@ -35,15 +36,15 @@ func (w *MockWriterFull) Run(ctx context.Context) (async.Task, error) {
}

func TestMulti(t *testing.T) {
var count int
var count int64
sub := MockWriter(func(key key.Key, val []byte) error {
count++
atomic.AddInt64(&count, 1)
return nil
})

multiWriter := New(sub, sub, sub)
assert.NoError(t, multiWriter.Write(nil, nil))
assert.Equal(t, 3, count)
assert.EqualValues(t, 3, count)

mock1 := MockWriterFull{Count: 0}
mock2 := MockWriterFull{Count: 5}
Expand Down
15 changes: 12 additions & 3 deletions internal/storage/writer/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ package s3

import (
"bytes"
"github.com/aws/aws-sdk-go/aws/credentials"
"path"
"runtime"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/errors"
)

const ctxTag = "s3"

// Uploader uploads to underlying backend
//go:generate mockery -name=S3Uploader -case underscore -testonly -inpkg
type Uploader interface {
Expand All @@ -26,14 +30,15 @@ type Uploader interface {

// Writer represents a writer for Amazon S3 and compatible storages.
type Writer struct {
monitor monitor.Monitor
uploader Uploader
bucket string
prefix string
sse string
}

// New initializes a new S3 writer.
func New(bucket, prefix, region, endpoint, sse, access, secret string, concurrency int) (*Writer, error) {
func New(monitor monitor.Monitor, bucket, prefix, region, endpoint, sse, access, secret string, concurrency int) (*Writer, error) {
if concurrency == 0 {
concurrency = runtime.NumCPU()
}
Expand All @@ -52,6 +57,7 @@ func New(bucket, prefix, region, endpoint, sse, access, secret string, concurren

client := s3.New(session.New(), config)
return &Writer{
monitor: monitor,
uploader: s3manager.NewUploaderWithClient(client, func(u *s3manager.Uploader) {
u.Concurrency = concurrency
}),
Expand All @@ -63,9 +69,10 @@ func New(bucket, prefix, region, endpoint, sse, access, secret string, concurren

// Write writes creates object of S3 bucket prefix key in S3Writer bucket with value val
func (w *Writer) Write(key key.Key, val []byte) error {
start := time.Now()
uploadInput := &s3manager.UploadInput{
Bucket: aws.String(w.bucket),
Body: bytes.NewBuffer(val),
Body: bytes.NewReader(val),
Key: aws.String(path.Join(w.prefix, string(key))),
}

Expand All @@ -76,8 +83,10 @@ func (w *Writer) Write(key key.Key, val []byte) error {

// Upload to S3
if _, err := w.uploader.Upload(uploadInput); err != nil {
w.monitor.Count1(ctxTag, "writeerror")
return errors.Internal("s3: unable to write", err)
}
w.monitor.Histogram(ctxTag, "writelatency", float64(time.Since(start)))
return nil
}

Expand Down
Loading

0 comments on commit b84ed6d

Please sign in to comment.