Skip to content

Commit

Permalink
Clickhouse resync: use Go time.now for setting synced_at column (#1951)
Browse files Browse the repository at this point in the history
In Clickhouse's RenameTables, we need to set the synced_at column of the
resynced table to the current time.
Research suggests that using now() along with allowing non deterministic
mutations in Clickhouse could be expensive and also lead to inaccuracies

This PR calculates the current timestamp in Go and substitutes it in the
alter update command.
  • Loading branch information
Amogh-Bharadwaj authored Jul 17, 2024
1 parent 4a92510 commit 3caa312
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
7 changes: 5 additions & 2 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"strings"
"time"

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand Down Expand Up @@ -161,9 +162,11 @@ func (c *ClickhouseConnector) RenameTables(ctx context.Context, req *protos.Rena
for _, renameRequest := range req.RenameTableOptions {
if req.SyncedAtColName != "" {
syncedAtCol := strings.ToLower(req.SyncedAtColName)
// get the current timestamp in UTC which can be used as SQL's now()
currentTimestamp := time.Now().UTC().Format("2006-01-02 15:04:05")
err := c.execWithLogging(ctx,
fmt.Sprintf("ALTER TABLE %s UPDATE %s=now() WHERE true SETTINGS allow_nondeterministic_mutations=1",
renameRequest.CurrentName, syncedAtCol))
fmt.Sprintf("ALTER TABLE %s UPDATE %s='%s' WHERE true",
renameRequest.CurrentName, syncedAtCol, currentTimestamp))
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w",
renameRequest.CurrentName, err)
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ func (c *ClickhouseConnector) ValidateCheck(ctx context.Context) error {
return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err)
}

currentTimestamp := time.Now().UTC().Format("2006-01-02 15:04:05")
// alter update the row
err = c.database.Exec(ctx, fmt.Sprintf("ALTER TABLE %s UPDATE updated_at = now() WHERE id = 1 SETTINGS allow_nondeterministic_mutations=1",
validateDummyTableName))
err = c.database.Exec(ctx, fmt.Sprintf("ALTER TABLE %s UPDATE updated_at = '%s' WHERE id = 1",
validateDummyTableName, currentTimestamp))
if err != nil {
return fmt.Errorf("failed to update validation table %s: %w", validateDummyTableName, err)
}
Expand Down

0 comments on commit 3caa312

Please sign in to comment.