Skip to content

Commit

Permalink
partition more by parts in eventhub (#460)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Sep 30, 2023
1 parent 7b2d6dd commit 548bbb0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 26 deletions.
51 changes: 34 additions & 17 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil
}

flushTopic := func(topic ScopedEventhub) error {
err := c.sendBatch(topic, batchPerTopic[topic])
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to send event batch - %s: %v", topic, err)
return err
}

delete(batchPerTopic, topic)
return nil
}

topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -162,7 +175,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
// if the error contains `EventData could not be added because it is too large for the batch`
// then flush the batch and try again.
if strings.Contains(err.Error(), "too large for the batch") {
err := flushBatch()
err := flushTopic(topicName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -241,9 +254,6 @@ func (c *EventHubConnector) sendEventBatch(
return nil
}

subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute)
defer cancel()

var numEventsPushed int32
var wg sync.WaitGroup
var once sync.Once
Expand All @@ -260,20 +270,8 @@ func (c *EventHubConnector) sendEventBatch(
wg.Done()
}()

hub, err := c.hubManager.GetOrCreateHubClient(tblName)
if err != nil {
once.Do(func() { firstErr = err })
return
}

numEvents := eventBatch.NumEvents()
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("obtained hub connection and now sending %d events to event hub: %s",
numEvents, tblName)

opts := &azeventhubs.SendEventDataBatchOptions{}
err = hub.SendEventDataBatch(subCtx, eventBatch, opts)
err := c.sendBatch(tblName, eventBatch)
if err != nil {
once.Do(func() { firstErr = err })
return
Expand Down Expand Up @@ -303,6 +301,25 @@ func (c *EventHubConnector) sendEventBatch(
return nil
}

func (c *EventHubConnector) sendBatch(tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error {
subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute)
defer cancel()

hub, err := c.hubManager.GetOrCreateHubClient(tblName)
if err != nil {
return err
}

opts := &azeventhubs.SendEventDataBatchOptions{}
err = hub.SendEventDataBatch(subCtx, events, opts)
if err != nil {
return err
}

log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString())
return nil
}

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the "eh_peer.eh_topic" that ought to be used.
Expand Down
22 changes: 13 additions & 9 deletions flow/connectors/eventhub/scoped_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,33 @@ import (
)

type ScopedEventhub struct {
PeerName string
Eventhub string
PeerName string
Eventhub string
Identifier string
}

func NewScopedEventhub(raw string) (ScopedEventhub, error) {
// split by dot
// split by dot, the model is peername.eventhub.identifier
parts := strings.Split(raw, ".")

if len(parts) != 2 {
return ScopedEventhub{}, fmt.Errorf("invalid peer and topic name %s", raw)
if len(parts) != 3 {
return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", raw)
}

return ScopedEventhub{
PeerName: parts[0],
Eventhub: parts[1],
PeerName: parts[0],
Eventhub: parts[1],
Identifier: parts[2],
}, nil
}

func (s ScopedEventhub) Equals(other ScopedEventhub) bool {
return s.PeerName == other.PeerName && s.Eventhub == other.Eventhub
return s.PeerName == other.PeerName &&
s.Eventhub == other.Eventhub &&
s.Identifier == other.Identifier
}

// ToString returns the string representation of the ScopedEventhub
func (s ScopedEventhub) ToString() string {
return fmt.Sprintf("%s.%s", s.PeerName, s.Eventhub)
return fmt.Sprintf("%s.%s.%s", s.PeerName, s.Eventhub, s.Identifier)
}

0 comments on commit 548bbb0

Please sign in to comment.