Skip to content

Commit

Permalink
adds logs for eh
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 5, 2023
1 parent 63cd748 commit 0a0c3ed
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
49 changes: 38 additions & 11 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,16 @@ func (c *EventHubConnector) PullRecords(req *model.PullRecordsRequest) (*model.R
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
batch := req.Records
shutdown := utils.HeartbeatRoutine(c.ctx, 1*time.Minute, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()

batch := req.Records
eventsPerHeartBeat := 1000
eventsPerBatch := int(req.PushBatchSize)
maxParallelism := req.PushParallelism
Expand All @@ -114,7 +122,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
if err != nil {
log.Errorf("failed to convert record to json: %v", err)
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to convert record to json: %v", err)
return nil, err
}

Expand All @@ -132,7 +142,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

if (i+1)%eventsPerBatch == 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism)
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName)
if err != nil {
return nil, err
}
Expand All @@ -143,13 +154,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

// send the remaining events.
if len(batchPerTopic) > 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism)
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName)
if err != nil {
return nil, err
}
}

log.Infof("[total] successfully sent %d records to event hub", len(batch.Records))
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("[total] successfully sent %d records to event hub", len(batch.Records))

err := c.UpdateLastOffset(req.FlowJobName, batch.LastCheckPointID)
if err != nil {
Expand All @@ -164,9 +178,13 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}, nil
}

func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, maxParallelism int64) error {
func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
maxParallelism int64,
flowName string) error {
if len(events) == 0 {
log.Info("no events to send")
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("no events to send")
return nil
}

Expand Down Expand Up @@ -194,14 +212,21 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
once.Do(func() { firstErr = err })
return
}

log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("obtained hub connection and now sending %d events to event hub: %s",
len(eventBatch), tblName)
err = hub.SendBatch(subCtx, eventhub.NewEventBatchIterator(eventBatch...))
if err != nil {
once.Do(func() { firstErr = err })
return
}

atomic.AddInt32(&numEventsPushed, int32(len(eventBatch)))
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
numEventsPushed, tblName)
}(tblName, eventBatch)
}

Expand Down Expand Up @@ -237,7 +262,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
tableMap := req.GetTableNameMapping()

for _, table := range tableMap {
err := c.ensureEventHub(c.ctx, table)
err := c.ensureEventHub(c.ctx, table, req.FlowJobName)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
Expand All @@ -255,7 +280,7 @@ func (c *EventHubConnector) GetTableSchema(
panic("get table schema not implemented for event hub")
}

func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string) error {
func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string, flowName string) error {
hubClient, err := c.getEventHubMgmtClient()
if err != nil {
return err
Expand All @@ -282,7 +307,9 @@ func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string) err
return err
}

log.Infof("event hub %s created", name)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("event hub %s created", name)
} else {
log.Infof("event hub %s already exists", name)
}
Expand Down
8 changes: 6 additions & 2 deletions flow/connectors/eventhub/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState
}, nil
}

log.Errorf("failed to get last offset: %v", err)
log.WithFields(log.Fields{
"flowName": jobName,
}).Errorf("failed to get last offset: %v", err)
return nil, err
}

Expand Down Expand Up @@ -155,7 +157,9 @@ func (c *EventHubConnector) UpdateLastOffset(jobName string, offset int64) error
}

// update the last offset
log.Infof("updating last offset for job `%s` to `%d`", jobName, offset)
log.WithFields(log.Fields{
"flowName": jobName,
}).Infof("updating last offset for job `%s` to `%d`", jobName, offset)
_, err = tx.Exec(c.ctx, `
INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset)
VALUES ($1, $2)
Expand Down

0 comments on commit 0a0c3ed

Please sign in to comment.