diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index dbb6d5dc2e..d2d68a789f 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -169,15 +169,27 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { } func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { - tables := map[string]string{ - fmt.Sprintf("e2e_test_%s.test_1", s.suffix): "test_1_dst", + setupTx, err := s.pool.Begin(context.Background()) + require.NoError(s.t, err) + // setup 3 tables in pgpeer_repl_test schema + // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 + tables := []string{"test_1", "test_2", "test_3"} + for _, table := range tables { + _, err = setupTx.Exec(context.Background(), + fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table)) + require.NoError(s.t, err) } + err = setupTx.Commit(context.Background()) + require.NoError(s.t, err) + flowJobName := "test_simple_slot_creation" flowLog := slog.String(string(shared.FlowNameKey), flowJobName) setupReplicationInput := &protos.SetupReplicationInput{ - FlowJobName: flowJobName, - TableNameMapping: tables, + FlowJobName: flowJobName, + TableNameMapping: map[string]string{ + fmt.Sprintf("e2e_test_%s.test_1", s.suffix): "test_1_dst", + }, } signal := connpostgres.NewSlotSignal()