diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 51dba78b7d..a7ab466a51 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -61,15 +61,8 @@ func NewEventHubConnector( func (c *EventHubConnector) Close() error { var allErrors error - // close all the eventhub connections. - err := c.hubManager.Close() - if err != nil { - log.Errorf("failed to close eventhub connections: %v", err) - allErrors = errors.Join(allErrors, err) - } - // close the postgres metadata store. - err = c.pgMetadata.Close() + err := c.pgMetadata.Close() if err != nil { log.Errorf("failed to close postgres metadata store: %v", err) allErrors = errors.Join(allErrors, err) @@ -129,46 +122,34 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error return nil } -func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { - return fmt.Sprintf("syncing records to eventhub with"+ - " push parallelism %d and push batch size %d", - req.PushParallelism, req.PushBatchSize) - }) - defer func() { - shutdown <- true - }() +func (c *EventHubConnector) processBatch( + flowJobName string, + batch *model.RecordBatch, + eventsPerBatch int, + maxParallelism int64, +) error { + ctx := context.Background() + tableNameRowsMapping := cmap.New[uint32]() - batch := req.Records eventsPerHeartBeat := 1000 - eventsPerBatch := int(req.PushBatchSize) - if eventsPerBatch <= 0 { - eventsPerBatch = 10000 - } - maxParallelism := req.PushParallelism - if maxParallelism <= 0 { - maxParallelism = 10 - } batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) - startTime := time.Now() for i, record := range batch.Records { json, err := record.GetItems().ToJSONWithOpts(toJSONOpts) if err != nil { log.WithFields(log.Fields{ - "flowName": req.FlowJobName, + "flowName": flowJobName, }).Infof("failed to convert record to json: %v", err) - return nil, err + return err } flushBatch := func() error { - err := c.sendEventBatch(batchPerTopic, maxParallelism, - req.FlowJobName, tableNameRowsMapping) + err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) if err != nil { log.WithFields(log.Fields{ - "flowName": req.FlowJobName, + "flowName": flowJobName, }).Infof("failed to send event batch: %v", err) return err } @@ -179,45 +160,88 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S topicName, err := NewScopedEventhub(record.GetTableName()) if err != nil { log.WithFields(log.Fields{ - "flowName": req.FlowJobName, + "flowName": flowJobName, }).Infof("failed to get topic name: %v", err) - return nil, err + return err } - err = batchPerTopic.AddEvent(topicName, json) + err = batchPerTopic.AddEvent(ctx, topicName, json) if err != nil { log.WithFields(log.Fields{ - "flowName": req.FlowJobName, + "flowName": flowJobName, }).Infof("failed to add event to batch: %v", err) - return nil, err + return err } if i%eventsPerHeartBeat == 0 { - activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName.ToString())) + activity.RecordHeartbeat(ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName.ToString())) } if (i+1)%eventsPerBatch == 0 { err := flushBatch() if err != nil { - return nil, err + return err } } } - // send the remaining events. if batchPerTopic.Len() > 0 { - err := c.sendEventBatch(batchPerTopic, maxParallelism, - req.FlowJobName, tableNameRowsMapping) + err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) if err != nil { - return nil, err + return err } } + rowsSynced := len(batch.Records) log.WithFields(log.Fields{ - "flowName": req.FlowJobName, + "flowName": flowJobName, }).Infof("[total] successfully sent %d records to event hub", rowsSynced) + return nil +} + +func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { + shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { + return fmt.Sprintf("syncing records to eventhub with"+ + " push parallelism %d and push batch size %d", + req.PushParallelism, req.PushBatchSize) + }) + defer func() { + shutdown <- true + }() + + eventsPerBatch := int(req.PushBatchSize) + if eventsPerBatch <= 0 { + eventsPerBatch = 10000 + } + maxParallelism := req.PushParallelism + if maxParallelism <= 0 { + maxParallelism = 10 + } - err := c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID) + var err error + startTime := time.Now() + + batch := req.Records + + // if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true + // we kick off processBatch in a goroutine and return immediately. + // otherwise, we block until processBatch is done. + if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) { + go func() { + err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism) + if err != nil { + log.Errorf("[async] failed to process batch: %v", err) + } + }() + } else { + err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism) + if err != nil { + log.Errorf("failed to process batch: %v", err) + return nil, err + } + } + + err = c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID) if err != nil { log.Errorf("failed to update last offset: %v", err) return nil, err @@ -228,18 +252,19 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, err } - metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime)) - metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), - time.Since(startTime), int64(rowsSynced)) + rowsSynced := int64(len(batch.Records)) + metrics.LogSyncMetrics(c.ctx, req.FlowJobName, rowsSynced, time.Since(startTime)) + metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, rowsSynced, time.Since(startTime), rowsSynced) return &model.SyncResponse{ FirstSyncedCheckPointID: batch.FirstCheckPointID, LastSyncedCheckPointID: batch.LastCheckPointID, - NumRecordsSynced: int64(len(batch.Records)), - TableNameRowsMapping: tableNameRowsMapping.Items(), + NumRecordsSynced: rowsSynced, + TableNameRowsMapping: make(map[string]uint32), }, nil } func (c *EventHubConnector) sendEventBatch( + ctx context.Context, events *HubBatches, maxParallelism int64, flowName string, @@ -268,7 +293,7 @@ func (c *EventHubConnector) sendEventBatch( }() numEvents := eventBatch.NumEvents() - err := c.sendBatch(tblName, eventBatch) + err := c.sendBatch(ctx, tblName, eventBatch) if err != nil { once.Do(func() { firstErr = err }) return @@ -298,8 +323,12 @@ func (c *EventHubConnector) sendEventBatch( return nil } -func (c *EventHubConnector) sendBatch(tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error { - subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute) +func (c *EventHubConnector) sendBatch( + ctx context.Context, + tblName ScopedEventhub, + events *azeventhubs.EventDataBatch, +) error { + subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() hub, err := c.hubManager.GetOrCreateHubClient(tblName) diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 97d1e568a4..652e10d45f 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -1,6 +1,7 @@ package conneventhub import ( + "context" "fmt" "strings" @@ -20,14 +21,14 @@ func NewHubBatches(manager *EventHubManager) *HubBatches { } } -func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { +func (h *HubBatches) AddEvent(ctx context.Context, name ScopedEventhub, event string) error { batches, ok := h.batches[name] if !ok { batches = []*azeventhubs.EventDataBatch{} } if len(batches) == 0 { - newBatch, err := h.manager.CreateEventDataBatch(name) + newBatch, err := h.manager.CreateEventDataBatch(ctx, name) if err != nil { return err } @@ -36,7 +37,7 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil { if strings.Contains(err.Error(), "too large for the batch") { - overflowBatch, err := h.handleBatchOverflow(name, event) + overflowBatch, err := h.handleBatchOverflow(ctx, name, event) if err != nil { return fmt.Errorf("failed to handle batch overflow: %v", err) } @@ -51,10 +52,11 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { } func (h *HubBatches) handleBatchOverflow( + ctx context.Context, name ScopedEventhub, event string, ) (*azeventhubs.EventDataBatch, error) { - newBatch, err := h.manager.CreateEventDataBatch(name) + newBatch, err := h.manager.CreateEventDataBatch(ctx, name) if err != nil { return nil, err } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index be2681110d..1241c0ec7a 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -16,7 +16,6 @@ import ( ) type EventHubManager struct { - ctx context.Context creds *azidentity.DefaultAzureCredential // eventhub peer name -> config peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig] @@ -36,7 +35,6 @@ func NewEventHubManager( } return &EventHubManager{ - ctx: ctx, creds: creds, peerConfig: peerConfig, } @@ -69,30 +67,14 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub return hub.(*azeventhubs.ProducerClient), nil } -func (m *EventHubManager) Close() error { - var globalErr error - m.hubs.Range(func(key, value interface{}) bool { - hub := value.(*azeventhubs.ProducerClient) - err := hub.Close(m.ctx) - if err != nil { - log.Errorf("failed to close eventhub client: %v", err) - globalErr = fmt.Errorf("failed to close eventhub client: %v", err) - return false - } - return true - }) - - return globalErr -} - -func (m *EventHubManager) CreateEventDataBatch(name ScopedEventhub) (*azeventhubs.EventDataBatch, error) { +func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (*azeventhubs.EventDataBatch, error) { hub, err := m.GetOrCreateHubClient(name) if err != nil { return nil, err } opts := &azeventhubs.EventDataBatchOptions{} - batch, err := hub.NewEventDataBatch(m.ctx, opts) + batch, err := hub.NewEventDataBatch(ctx, opts) if err != nil { return nil, fmt.Errorf("failed to create event data batch: %v", err) } diff --git a/flow/connectors/utils/env.go b/flow/connectors/utils/env.go new file mode 100644 index 0000000000..d3c1acfff5 --- /dev/null +++ b/flow/connectors/utils/env.go @@ -0,0 +1,30 @@ +package utils + +import ( + "os" + "strconv" +) + +// GetEnv returns the value of the environment variable with the given name +// and a boolean indicating whether the environment variable exists. +func GetEnv(name string) (string, bool) { + val, exists := os.LookupEnv(name) + return val, exists +} + +// GetEnvBool returns the value of the environment variable with the given name +// or defaultValue if the environment variable is not set or is not a valid +// boolean value. +func GetEnvBool(name string, defaultValue bool) bool { + val, ok := GetEnv(name) + if !ok { + return defaultValue + } + + b, err := strconv.ParseBool(val) + if err != nil { + return defaultValue + } + + return b +}