diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index fa7f03cf88..edbd0392c9 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "strings" + "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -103,6 +104,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( ) (int, error) { dstTableName := config.DestinationTableIdentifier stagingPath := s.connector.credsProvider.BucketPath + startTime := time.Now() avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema()) if err != nil { @@ -154,6 +156,11 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( return 0, err } + if err := s.connector.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil { + s.connector.logger.Error("Failed to finish QRep partition", slog.Any("error", err)) + return 0, err + } + return avroFile.NumRecords, nil }