diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 4f2f944a97..df6ee3f1df 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -220,6 +220,48 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { require.NoError(s.t, err) } +func (s PeerFlowE2ETestSuitePG) Test_PG_TypeSystemQRep() { + numRows := 10 + + srcTable := "test_qrep_flow_pgpg_1" + s.setupSourceTable(srcTable, numRows) + + dstTable := "test_qrep_flow_pgpg_2" + + err := e2e.CreateTableForQRep(s.Conn(), s.suffix, dstTable) + require.NoError(s.t, err) + + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.suffix, srcTable) + + postgresPeer := e2e.GeneratePostgresPeer() + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_qrep_flow_pgpg", + srcSchemaQualified, + dstSchemaQualified, + query, + postgresPeer, + "", + true, + "", + "", + ) + require.NoError(s.t, err) + qrepConfig.System = protos.TypeSystem_PG + + tc := e2e.NewTemporalClient(s.t) + env := e2e.RunQRepFlowWorkflow(tc, qrepConfig) + e2e.EnvWaitForFinished(s.t, env, 3*time.Minute) + require.NoError(s.t, env.Error()) + + err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*") + require.NoError(s.t, err) +} + func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() { numRows := 10