diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 8fd2c112a2..7c44aa762d 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -132,6 +132,19 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil } + flushTopic := func(topic ScopedEventhub) error { + err := c.sendBatch(topic, batchPerTopic[topic]) + if err != nil { + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("failed to send event batch - %s: %v", topic, err) + return err + } + + delete(batchPerTopic, topic) + return nil + } + topicName, err := NewScopedEventhub(record.GetTableName()) if err != nil { log.WithFields(log.Fields{ @@ -162,7 +175,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S // if the error contains `EventData could not be added because it is too large for the batch` // then flush the batch and try again. if strings.Contains(err.Error(), "too large for the batch") { - err := flushBatch() + err := flushTopic(topicName) if err != nil { return nil, err } @@ -241,9 +254,6 @@ func (c *EventHubConnector) sendEventBatch( return nil } - subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute) - defer cancel() - var numEventsPushed int32 var wg sync.WaitGroup var once sync.Once @@ -260,20 +270,8 @@ func (c *EventHubConnector) sendEventBatch( wg.Done() }() - hub, err := c.hubManager.GetOrCreateHubClient(tblName) - if err != nil { - once.Do(func() { firstErr = err }) - return - } - numEvents := eventBatch.NumEvents() - log.WithFields(log.Fields{ - "flowName": flowName, - }).Infof("obtained hub connection and now sending %d events to event hub: %s", - numEvents, tblName) - - opts := &azeventhubs.SendEventDataBatchOptions{} - err = hub.SendEventDataBatch(subCtx, eventBatch, opts) + err := c.sendBatch(tblName, eventBatch) if err != nil { once.Do(func() { firstErr = err }) return @@ -303,6 +301,25 @@ func (c *EventHubConnector) sendEventBatch( return nil } +func (c *EventHubConnector) sendBatch(tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error { + subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute) + defer cancel() + + hub, err := c.hubManager.GetOrCreateHubClient(tblName) + if err != nil { + return err + } + + opts := &azeventhubs.SendEventDataBatchOptions{} + err = hub.SendEventDataBatch(subCtx, events, opts) + if err != nil { + return err + } + + log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString()) + return nil +} + func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { // create topics for each table // key is the source table and value is the "eh_peer.eh_topic" that ought to be used. diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index d67a91ad15..048ce595c8 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -6,29 +6,33 @@ import ( ) type ScopedEventhub struct { - PeerName string - Eventhub string + PeerName string + Eventhub string + Identifier string } func NewScopedEventhub(raw string) (ScopedEventhub, error) { - // split by dot + // split by dot, the model is peername.eventhub.identifier parts := strings.Split(raw, ".") - if len(parts) != 2 { - return ScopedEventhub{}, fmt.Errorf("invalid peer and topic name %s", raw) + if len(parts) != 3 { + return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", raw) } return ScopedEventhub{ - PeerName: parts[0], - Eventhub: parts[1], + PeerName: parts[0], + Eventhub: parts[1], + Identifier: parts[2], }, nil } func (s ScopedEventhub) Equals(other ScopedEventhub) bool { - return s.PeerName == other.PeerName && s.Eventhub == other.Eventhub + return s.PeerName == other.PeerName && + s.Eventhub == other.Eventhub && + s.Identifier == other.Identifier } // ToString returns the string representation of the ScopedEventhub func (s ScopedEventhub) ToString() string { - return fmt.Sprintf("%s.%s", s.PeerName, s.Eventhub) + return fmt.Sprintf("%s.%s.%s", s.PeerName, s.Eventhub, s.Identifier) }