From 9fc3aebb6242758f2cb42111a01ee00dcd260318 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 29 Oct 2023 12:35:47 -0400 Subject: [PATCH] log latest ts at eventhub --- flow/connectors/eventhub/eventhub.go | 18 +++++++++++++++++- flow/model/model.go | 15 +++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 58210a1ad0..c604dc68a3 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -133,9 +133,19 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) + maxTime := time.Time{} + numRecords := 0 for record := range batch.GetRecords() { numRecords++ + + tsval, err := record.GetItems().GetColValueAsTimestamp("ts") + if err == nil { + if tsval.After(maxTime) { + maxTime = *tsval + } + } + json, err := record.GetItems().ToJSONWithOpts(toJSONOpts) if err != nil { log.WithFields(log.Fields{ @@ -187,9 +197,15 @@ func (c *EventHubConnector) processBatch( } } + utcMaxTSString := "" + if !maxTime.IsZero() { + utcMaxTSString = maxTime.UTC().Format(time.RFC3339) + } + log.WithFields(log.Fields{ "flowName": flowJobName, - }).Infof("[total] successfully sent %d records to event hub", numRecords) + }).Infof("[total] successfully sent %d records to event hub - max ts in utc - %s", + numRecords, utcMaxTSString) return uint32(numRecords), nil } diff --git a/flow/model/model.go b/flow/model/model.go index a14c6b73a6..5124865dc9 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -45,6 +45,21 @@ type Record interface { GetItems() *RecordItems } +// get col value as timestamp (on record) +func (r *RecordItems) GetColValueAsTimestamp(colName string) (*time.Time, error) { + val, err := r.GetValueByColName(colName) + if err != nil { + return nil, err + } + + t, ok := val.Value.(time.Time) + if !ok { + return nil, fmt.Errorf("expected time.Time value for column %s", colName) + } + + return &t, nil +} + type RecordItems struct { colToValIdx map[string]int values []*qvalue.QValue