Skip to content

Commit

Permalink
e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 8, 2024
1 parent fae7d55 commit 9e56e46
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,55 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() {
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) TestTransform() {
numRows := 10

srcTable := "test_transform"
s.setupSourceTable(srcTable, numRows)

dstTable := "test_transformdst"

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable)

query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcSchemaQualified)

postgresPeer := e2e.GeneratePostgresPeer()

_, err := s.Conn().Exec(context.Background(), `insert into public.scripts (name, lang, source) values
('pgtransform', 'lua', 'function transformRow(row) row.myreal = 1729 end') on conflict do nothing`)
require.NoError(s.t, err)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
"test_transform",
srcSchemaQualified,
dstSchemaQualified,
query,
postgresPeer,
"",
true,
"_PEERDB_SYNCED_AT",
"",
)
require.NoError(s.t, err)
qrepConfig.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE,
}
qrepConfig.InitialCopyOnly = false
qrepConfig.Script = "pgtransform"

tc := e2e.NewTemporalClient(s.t)
env := e2e.RunQRepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "waiting for first sync to complete", func() bool {
err = s.compareCounts(dstSchemaQualified, int64(numRows))
return err == nil
})
require.NoError(s.t, env.Error())

var exists bool
err = s.Conn().QueryRow(context.Background(),
fmt.Sprintf("select exists(select * from %s where myreal <> 1729)", dstSchemaQualified)).Scan(&exists)
require.NoError(s.t, err)
require.False(s.t, exists)
}

0 comments on commit 9e56e46

Please sign in to comment.