From ec91fe1e0b67e0e01308ad40a71a8d3d37f1144c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 8 May 2024 01:58:10 +0000 Subject: [PATCH] e2e --- flow/e2e/postgres/qrep_flow_pg_test.go | 54 ++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 63a226ae13..8601bffdad 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -406,3 +406,57 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +func (s PeerFlowE2ETestSuitePG) TestTransform() { + numRows := 10 + + srcTable := "test_transform" + s.setupSourceTable(srcTable, numRows) + + dstTable := "test_transformdst" + + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.suffix, srcTable) + + postgresPeer := e2e.GeneratePostgresPeer() + + _, err := s.Conn().Exec(context.Background(), `insert into public.scripts (name, lang, source) values + ('pgtransform', 'lua', 'function transformRow(row) row.myreal = 1729 end') on conflict do nothing`) + require.NoError(s.t, err) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_transform", + srcSchemaQualified, + dstSchemaQualified, + query, + postgresPeer, + "", + true, + "_PEERDB_SYNCED_AT", + "", + ) + require.NoError(s.t, err) + qrepConfig.WriteMode = &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE, + } + qrepConfig.InitialCopyOnly = false + qrepConfig.Script = "pgtransform" + + tc := e2e.NewTemporalClient(s.t) + env := e2e.RunQRepFlowWorkflow(tc, qrepConfig) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "waiting for first sync to complete", func() bool { + err = s.compareCounts(dstSchemaQualified, int64(numRows)) + return err == nil + }) + require.NoError(s.t, env.Error()) + + var exists bool + err = s.Conn().QueryRow(context.Background(), + fmt.Sprintf("select exists(select * from e2e_test_%s.%s where myreal <> 1729)", + s.suffix, srcTable)).Scan(&exists) + require.NoError(s.t, err) + require.False(s.t, exists) +}