Skip to content

Commit

Permalink
postgres_repl_test: combine into e2e/postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 2, 2024
1 parent 5d731b1 commit 645acae
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 150 deletions.
150 changes: 0 additions & 150 deletions flow/connectors/postgres/postgres_repl_test.go

This file was deleted.

46 changes: 46 additions & 0 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"strings"
"testing"
"time"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
Expand Down Expand Up @@ -39,6 +40,8 @@ func TestPeerFlowE2ETestSuitePG(t *testing.T) {
}

func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuitePG {
t.Helper()

err := godotenv.Load()
if err != nil {
// it's okay if the .env file is not present
Expand Down Expand Up @@ -165,6 +168,49 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error {
return rows.Err()
}

func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() {
setupTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
// setup 3 tables in pgpeer_repl_test schema
// test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5
tables := []string{"test_1", "test_2", "test_3"}
for _, table := range tables {
_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE e2e_test_%s.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", s.suffix, table))
require.NoError(s.t, err)
}

err = setupTx.Commit(context.Background())
require.NoError(s.t, err)

flowJobName := "test_simple_slot_creation"
flowLog := slog.String(string(shared.FlowNameKey), flowJobName)
setupReplicationInput := &protos.SetupReplicationInput{
FlowJobName: flowJobName,
TableNameMapping: map[string]string{
fmt.Sprintf("e2e_test_%s.test_1", s.suffix): "test_1_dst",
},
}

signal := connpostgres.NewSlotSignal()

// Moved to a go routine
go func() {
err := s.connector.SetupReplication(signal, setupReplicationInput)
require.NoError(s.t, err)
}()

slog.Info("waiting for slot creation to complete", flowLog)
slotInfo := <-signal.SlotCreated
slog.Info(fmt.Sprintf("slot creation complete: %v", slotInfo), flowLog)

slog.Info("signaling clone complete after waiting for 2 seconds", flowLog)
time.Sleep(2 * time.Second)
signal.CloneComplete <- struct{}{}

slog.Info("successfully setup replication", flowLog)
}

func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down

0 comments on commit 645acae

Please sign in to comment.