Skip to content

Commit

Permalink
[clickhouse] actually call FinishQRepPartition to store partition sta…
Browse files Browse the repository at this point in the history
…te (#2218)

otherwise partitions will repeat themselves because we think it hasn't
synced
  • Loading branch information
heavycrystal authored Nov 5, 2024
1 parent fc59d94 commit 6e7a219
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"

Expand Down Expand Up @@ -103,6 +104,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
) (int, error) {
dstTableName := config.DestinationTableIdentifier
stagingPath := s.connector.credsProvider.BucketPath
startTime := time.Now()

avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema())
if err != nil {
Expand Down Expand Up @@ -154,6 +156,11 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
return 0, err
}

if err := s.connector.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil {
s.connector.logger.Error("Failed to finish QRep partition", slog.Any("error", err))
return 0, err
}

return avroFile.NumRecords, nil
}

Expand Down

0 comments on commit 6e7a219

Please sign in to comment.