Skip to content

Commit

Permalink
Merge branch 'main' into charts-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Nov 20, 2023
2 parents e3ae33a + b65913f commit 8de0c0c
Show file tree
Hide file tree
Showing 12 changed files with 506 additions and 555 deletions.
98 changes: 57 additions & 41 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,54 +126,70 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

eventHubFlushTimeout :=
time.Duration(utils.GetEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)) *
time.Second

ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()

numRecords := 0
for record := range batch.GetRecords() {
numRecords++
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to convert record to json: %v", err)
return 0, err
}
for {
select {
case record, ok := <-batch.GetRecords():
if !ok {
err := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("[total] successfully sent %d records to event hub", numRecords)
return uint32(numRecords), nil
}

topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to get topic name: %v", err)
return 0, err
}
numRecords++
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to convert record to json: %v", err)
return 0, err
}

err = batchPerTopic.AddEvent(ctx, topicName, json)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to add event to batch: %v", err)
return 0, err
}
topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to get topic name: %v", err)
return 0, err
}

if numRecords%1000 == 0 {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
}
}
err = batchPerTopic.AddEvent(ctx, topicName, json, false)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to add event to batch: %v", err)
return 0, err
}

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
if numRecords%1000 == 0 {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
}

flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if flushErr != nil {
return 0, flushErr
}
batchPerTopic.Clear()
case <-ticker.C:
err := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("[total] successfully flushed %d records to event hub", numRecords)
return uint32(numRecords), nil
ticker.Stop()
ticker = time.NewTicker(eventHubFlushTimeout)
}
}
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
Expand Down
93 changes: 48 additions & 45 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package conneventhub

import (
"context"
"errors"
"fmt"
"strings"
"sync/atomic"
"time"

Expand All @@ -14,72 +14,70 @@ import (

// multimap from ScopedEventhub to *azeventhubs.EventDataBatch
type HubBatches struct {
batches map[ScopedEventhub][]*azeventhubs.EventDataBatch
batch map[ScopedEventhub]*azeventhubs.EventDataBatch
manager *EventHubManager
}

func NewHubBatches(manager *EventHubManager) *HubBatches {
return &HubBatches{
batches: make(map[ScopedEventhub][]*azeventhubs.EventDataBatch),
batch: make(map[ScopedEventhub]*azeventhubs.EventDataBatch),
manager: manager,
}
}

func (h *HubBatches) AddEvent(ctx context.Context, name ScopedEventhub, event string) error {
batches, ok := h.batches[name]
if !ok {
batches = []*azeventhubs.EventDataBatch{}
}

if len(batches) == 0 {
func (h *HubBatches) AddEvent(
ctx context.Context,
name ScopedEventhub,
event string,
// this is true when we are retrying to send the event after the batch size exceeded
// this should initially be false, and then true when we are retrying.
retryForBatchSizeExceed bool,
) error {
batch, ok := h.batch[name]
if !ok || batch == nil {
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
if err != nil {
return err
return fmt.Errorf("failed to create event data batch: %v", err)
}
batches = append(batches, newBatch)
batch = newBatch
h.batch[name] = batch
}

if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
if strings.Contains(err.Error(), "too large for the batch") {
overflowBatch, err := h.handleBatchOverflow(ctx, name, event)
if err != nil {
return fmt.Errorf("failed to handle batch overflow: %v", err)
}
batches = append(batches, overflowBatch)
} else {
return fmt.Errorf("failed to add event data: %v", err)
}
err := tryAddEventToBatch(event, batch)
if err == nil {
// we successfully added the event to the batch, so we're done.
return nil
}

h.batches[name] = batches
return nil
}
if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
if retryForBatchSizeExceed {
// if we are already retrying, then we should just return the error
// as we have already tried to send the event to the batch.
return fmt.Errorf("[retry-failed] event too large to add to batch: %v", err)
}

func (h *HubBatches) handleBatchOverflow(
ctx context.Context,
name ScopedEventhub,
event string,
) (*azeventhubs.EventDataBatch, error) {
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
if err != nil {
return nil, err
}
if err := tryAddEventToBatch(event, newBatch); err != nil {
return nil, fmt.Errorf("failed to add event data to new batch: %v", err)
// if the event is too large, send the current batch and
// delete it from the map, so that a new batch can be created
// for the event next time.
if err := h.sendBatch(ctx, name, batch); err != nil {
return fmt.Errorf("failed to send batch: %v", err)
}
delete(h.batch, name)

return h.AddEvent(ctx, name, event, true)
} else {
return fmt.Errorf("failed to add event to batch: %v", err)
}
return newBatch, nil
}

func (h *HubBatches) Len() int {
return len(h.batches)
return len(h.batch)
}

// ForEach calls the given function for each ScopedEventhub and batch pair
func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch)) {
for name, batches := range h.batches {
for _, batch := range batches {
fn(name, batch)
}
for name, batch := range h.batch {
fn(name, batch)
}
}

Expand Down Expand Up @@ -137,14 +135,19 @@ func (h *HubBatches) flushAllBatches(
})
})

log.Infof("[sendEventBatch] successfully sent %d events in total to event hub",
err := g.Wait()
log.Infof("[flush] successfully sent %d events in total to event hub",
numEventsPushed)
return g.Wait()

// clear the batches after flushing them.
h.Clear()

return err
}

// Clear removes all batches from the HubBatches
func (h *HubBatches) Clear() {
h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)
h.batch = make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
}

func tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema)
}

func (c *S3Connector) Close() error {
log.Debugf("Closing s3 connector is a noop")
return nil
log.Debugf("Closing metadata store connection")
return c.pgMetadata.Close()
}

func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error {
Expand Down
6 changes: 3 additions & 3 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/jackc/pgx/v5 v5.5.0
github.com/jmoiron/sqlx v1.3.5
github.com/joho/godotenv v1.5.1
github.com/klauspost/compress v1.17.2
github.com/klauspost/compress v1.17.3
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.12.0
github.com/microsoft/go-mssqldb v1.6.0
Expand All @@ -32,13 +32,13 @@ require (
github.com/uber-go/tally/v4 v4.1.10
github.com/urfave/cli/v2 v2.25.7
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
go.temporal.io/api v1.25.0
go.temporal.io/api v1.26.0
go.temporal.io/sdk v1.25.1
go.temporal.io/sdk/contrib/tally v0.2.0
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.5.0
google.golang.org/api v0.150.0
google.golang.org/api v0.151.0
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down
6 changes: 6 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -433,6 +435,8 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA=
go.temporal.io/api v1.25.0 h1:V6lIYuQlfmM1dc2vn6mIG5F2cY3EQ+xEjfTZ801Vpx8=
go.temporal.io/api v1.25.0/go.mod h1:LTJM9iMOIuiE5hRtym4Ne6I4rKlDGioUiscdD9D6N2Y=
go.temporal.io/api v1.26.0 h1:N4V0Daqa0qqK5+9LELSZV7clBYrwB4l33iaFfKgycPk=
go.temporal.io/api v1.26.0/go.mod h1:uVAcpQJ6bM4mxZ3m7vSHU65fHjrwy9ktGQMtsNfMZQQ=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/sdk v1.25.1 h1:jC9l9vHHz5OJ7PR6OjrpYSN4+uEG0bLe5rdF9nlMSGk=
go.temporal.io/sdk v1.25.1/go.mod h1:X7iFKZpsj90BfszfpFCzLX8lwEJXbnRrl351/HyEgmU=
Expand Down Expand Up @@ -582,6 +586,8 @@ gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.150.0 h1:Z9k22qD289SZ8gCJrk4DrWXkNjtfvKAUo/l1ma8eBYE=
google.golang.org/api v0.150.0/go.mod h1:ccy+MJ6nrYFgE3WgRx/AMXOxOmU8Q4hSa+jjibzhxcg=
google.golang.org/api v0.151.0 h1:FhfXLO/NFdJIzQtCqjpysWwqKk8AzGWBUhMIx67cVDU=
google.golang.org/api v0.151.0/go.mod h1:ccy+MJ6nrYFgE3WgRx/AMXOxOmU8Q4hSa+jjibzhxcg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
Expand Down
Loading

0 comments on commit 8de0c0c

Please sign in to comment.