Skip to content

Commit

Permalink
tablemapping and keeping track of total records
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 21, 2023
1 parent 0e85c76 commit 3582c69
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (
)

type EventHubConnector struct {
ctx context.Context
config *protos.EventHubConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
tokenProvider auth.TokenProvider
hubs map[string]*eventhub.Hub
ctx context.Context
config *protos.EventHubConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
tokenProvider auth.TokenProvider
hubs map[string]*eventhub.Hub
totalRecordsSynced int
}

// NewEventHubConnector creates a new EventHubConnector.
Expand All @@ -55,12 +56,13 @@ func NewEventHubConnector(
}

return &EventHubConnector{
ctx: ctx,
config: config,
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
tokenProvider: jwtTokenProvider,
hubs: make(map[string]*eventhub.Hub),
ctx: ctx,
config: config,
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
tokenProvider: jwtTokenProvider,
hubs: make(map[string]*eventhub.Hub),
totalRecordsSynced: 0,
}, nil
}

Expand Down Expand Up @@ -163,7 +165,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("[total] successfully sent %d records to event hub", rowsSynced)

c.totalRecordsSynced += int(rowsSynced)
err := c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID)
if err != nil {
log.Errorf("failed to update last offset: %v", err)
Expand All @@ -177,11 +179,12 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime))
metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced),
time.Since(startTime), int64(rowsSynced))
time.Since(startTime), int64(c.totalRecordsSynced))
return &model.SyncResponse{
FirstSyncedCheckPointID: batch.FirstCheckPointID,
LastSyncedCheckPointID: batch.LastCheckPointID,
NumRecordsSynced: int64(len(batch.Records)),
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down

0 comments on commit 3582c69

Please sign in to comment.