Skip to content

Commit

Permalink
[clickhouse] test for replident full with unchanged TOAST (#2123)
Browse files Browse the repository at this point in the history
followup on #2092
  • Loading branch information
heavycrystal authored Oct 2, 2024
1 parent 5d847b5 commit bb45115
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
1 change: 0 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,6 @@ func processUpdateMessage[Items model.Items](
and only the old tuple does. So we can backfill the new tuple with the unchanged columns from the old tuple.
Otherwise, _peerdb_unchanged_toast_columns is set correctly and we fallback to normal unchanged TOAST handling in normalize,
but this doesn't work in connectors where we don't do unchanged TOAST handling in normalize.
TODO: investigate the cases where this happens in more detail.
*/
backfilledCols := newItems.UpdateIfNotExists(oldItems)
for _, col := range backfilledCols {
Expand Down
47 changes: 47 additions & 0 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package e2e_clickhouse

import (
"context"
"embed"
"fmt"
"testing"
"time"
Expand All @@ -16,6 +17,9 @@ import (
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

//go:embed test_data/*
var testData embed.FS

func TestPeerFlowE2ETestSuiteCH(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite)
}
Expand Down Expand Up @@ -355,3 +359,46 @@ func (s ClickHouseSuite) Test_Update_PKey_Env_Enabled() {
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_Replident_Full_Unchanged_TOAST_Updates() {
srcTableName := "test_replident_full_toast"
srcFullName := s.attachSchemaSuffix("test_replident_full_toast")
dstTableName := "test_replident_full_toast_dst"

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s(
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
c1 INT,
c2 INT,
t TEXT);
ALTER TABLE %s REPLICA IDENTITY FULL`, srcFullName, srcFullName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("clickhouse_test_replident_full_toast"),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)

tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

content, err := testData.ReadFile("test_data/big_data.json")
require.NoError(s.t, err)
contentStr := string(content)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (c1,c2,t) VALUES ($1,$2,$3)`, srcFullName), 1, 2, contentStr)
require.NoError(s.t, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial insert", srcTableName, dstTableName, "id,c1,c2,t")

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=$1 WHERE id=$2`, srcFullName), 3, 1)
require.NoError(s.t, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on update", srcTableName, dstTableName, "id,c1,c2,t")

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}
1 change: 1 addition & 0 deletions flow/e2e/clickhouse/test_data/big_data.json

Large diffs are not rendered by default.

0 comments on commit bb45115

Please sign in to comment.