From 3f48101348a8570e15a3d2985631c3fad65abf99 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Fri, 1 Sep 2023 21:19:29 +0530 Subject: [PATCH] Eventhub Normalize Fix (#370) `res` (result of normalize flow) in the below call is nil in case of Eventhub so we make it a non-nil: ```Go err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, res.EndBatchID) ``` --- flow/connectors/eventhub/eventhub.go | 6 +++++- flow/connectors/postgres/cdc.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index f94b5b0d3d..e674da2f67 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -310,7 +310,11 @@ func (c *EventHubConnector) SetupNormalizedTables( func (c *EventHubConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { log.Infof("normalization for event hub is a no-op") - return nil, nil + return &model.NormalizeResponse{ + EndBatchID: 0, + StartBatchID: 0, + Done: true, + }, nil } // cleanup diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index f89ce4419f..a3e45b6ed4 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -144,7 +144,7 @@ func (p *PostgresCDCSource) consumeStream( if err != nil { return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - log.Debugf("Sent Standby status message") + log.Infof("Sent Standby status message") nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) }