diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index c4fba1e2b0..e6e1e050ff 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -104,8 +104,16 @@ func (c *EventHubConnector) PullRecords(req *model.PullRecordsRequest) (*model.R } func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - batch := req.Records + shutdown := utils.HeartbeatRoutine(c.ctx, 1*time.Minute, func() string { + return fmt.Sprintf("syncing records to eventhub with"+ + " push parallelism %d and push batch size %d", + req.PushParallelism, req.PushBatchSize) + }) + defer func() { + shutdown <- true + }() + batch := req.Records eventsPerHeartBeat := 1000 eventsPerBatch := int(req.PushBatchSize) maxParallelism := req.PushParallelism @@ -114,7 +122,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S for i, record := range batch.Records { json, err := record.GetItems().ToJSON() if err != nil { - log.Errorf("failed to convert record to json: %v", err) + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("failed to convert record to json: %v", err) return nil, err } @@ -132,7 +142,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } if (i+1)%eventsPerBatch == 0 { - err := c.sendEventBatch(batchPerTopic, maxParallelism) + err := c.sendEventBatch(batchPerTopic, maxParallelism, + req.FlowJobName) if err != nil { return nil, err } @@ -143,13 +154,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S // send the remaining events. if len(batchPerTopic) > 0 { - err := c.sendEventBatch(batchPerTopic, maxParallelism) + err := c.sendEventBatch(batchPerTopic, maxParallelism, + req.FlowJobName) if err != nil { return nil, err } } - log.Infof("[total] successfully sent %d records to event hub", len(batch.Records)) + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("[total] successfully sent %d records to event hub", len(batch.Records)) err := c.UpdateLastOffset(req.FlowJobName, batch.LastCheckPointID) if err != nil { @@ -164,9 +178,13 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }, nil } -func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, maxParallelism int64) error { +func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, + maxParallelism int64, + flowName string) error { if len(events) == 0 { - log.Info("no events to send") + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("no events to send") return nil } @@ -194,7 +212,10 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, once.Do(func() { firstErr = err }) return } - + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("obtained hub connection and now sending %d events to event hub: %s", + len(eventBatch), tblName) err = hub.SendBatch(subCtx, eventhub.NewEventBatchIterator(eventBatch...)) if err != nil { once.Do(func() { firstErr = err }) @@ -202,6 +223,10 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, } atomic.AddInt32(&numEventsPushed, int32(len(eventBatch))) + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("pushed %d events to event hub: %s", + numEventsPushed, tblName) }(tblName, eventBatch) } @@ -237,7 +262,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr tableMap := req.GetTableNameMapping() for _, table := range tableMap { - err := c.ensureEventHub(c.ctx, table) + err := c.ensureEventHub(c.ctx, table, req.FlowJobName) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, @@ -255,7 +280,7 @@ func (c *EventHubConnector) GetTableSchema( panic("get table schema not implemented for event hub") } -func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string) error { +func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string, flowName string) error { hubClient, err := c.getEventHubMgmtClient() if err != nil { return err @@ -282,7 +307,9 @@ func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string) err return err } - log.Infof("event hub %s created", name) + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("event hub %s created", name) } else { log.Infof("event hub %s already exists", name) } diff --git a/flow/connectors/eventhub/metadata.go b/flow/connectors/eventhub/metadata.go index 84175b0d27..07bf30b339 100644 --- a/flow/connectors/eventhub/metadata.go +++ b/flow/connectors/eventhub/metadata.go @@ -125,7 +125,9 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState }, nil } - log.Errorf("failed to get last offset: %v", err) + log.WithFields(log.Fields{ + "flowName": jobName, + }).Errorf("failed to get last offset: %v", err) return nil, err } @@ -155,7 +157,9 @@ func (c *EventHubConnector) UpdateLastOffset(jobName string, offset int64) error } // update the last offset - log.Infof("updating last offset for job `%s` to `%d`", jobName, offset) + log.WithFields(log.Fields{ + "flowName": jobName, + }).Infof("updating last offset for job `%s` to `%d`", jobName, offset) _, err = tx.Exec(c.ctx, ` INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset) VALUES ($1, $2)