Skip to content

Commit

Permalink
log latest ts at eventhub
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 29, 2023
1 parent e515a5c commit 9fc3aeb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
18 changes: 17 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,19 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

maxTime := time.Time{}

numRecords := 0
for record := range batch.GetRecords() {
numRecords++

tsval, err := record.GetItems().GetColValueAsTimestamp("ts")
if err == nil {
if tsval.After(maxTime) {
maxTime = *tsval
}
}

json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -187,9 +197,15 @@ func (c *EventHubConnector) processBatch(
}
}

utcMaxTSString := ""
if !maxTime.IsZero() {
utcMaxTSString = maxTime.UTC().Format(time.RFC3339)
}

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("[total] successfully sent %d records to event hub", numRecords)
}).Infof("[total] successfully sent %d records to event hub - max ts in utc - %s",
numRecords, utcMaxTSString)
return uint32(numRecords), nil
}

Expand Down
15 changes: 15 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ type Record interface {
GetItems() *RecordItems
}

// get col value as timestamp (on record)
func (r *RecordItems) GetColValueAsTimestamp(colName string) (*time.Time, error) {
val, err := r.GetValueByColName(colName)
if err != nil {
return nil, err
}

t, ok := val.Value.(time.Time)
if !ok {
return nil, fmt.Errorf("expected time.Time value for column %s", colName)
}

return &t, nil
}

type RecordItems struct {
colToValIdx map[string]int
values []*qvalue.QValue
Expand Down

0 comments on commit 9fc3aeb

Please sign in to comment.