diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index abc23c4a89..642bc13a94 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -10,13 +10,16 @@ import ( "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -1103,180 +1106,201 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() { // test don't work, make it work later -// func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { -// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) -// // needed otherwise errors out -// workerOptions := worker.Options{ -// EnableSessionWorker: true, -// } -// env.SetWorkerOptions(workerOptions) - -// srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1") -// srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2") -// dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst") -// dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst") -// sentPause := false -// sentUpdate := false - -// // signals in tests are weird, you need to register them before starting the workflow -// // otherwise you guessed it, errors out. really don't like this. -// // too short of a gap between signals also causes issues -// // might have something to do with how test workflows handle fast-forwarding time. -// env.RegisterDelayedCallback(func() { -// env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal) -// s.t.Log("Sent pause signal") -// sentPause = true -// }, 28*time.Second) -// env.RegisterDelayedCallback(func() { -// env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{ -// IdleTimeout: 14, -// BatchSize: 12, -// AdditionalTables: []*protos.TableMapping{ -// { -// SourceTableIdentifier: srcTable2Name, -// DestinationTableIdentifier: dstTable2Name, -// }, -// }, -// }) -// s.t.Log("Sent update signal") -// sentUpdate = true -// }, 56*time.Second) -// env.RegisterDelayedCallback(func() { -// env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal) -// s.t.Log("Sent resume signal") -// }, 84*time.Second) - -// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` -// CREATE TABLE IF NOT EXISTS %s ( -// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, -// t TEXT DEFAULT md5(random()::text)); -// CREATE TABLE IF NOT EXISTS %s ( -// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, -// t TEXT DEFAULT md5(random()::text)); -// `, srcTable1Name, srcTable2Name)) -// require.NoError(s.t, err) - -// connectionGen := e2e.FlowConnectionGenerationConfig{ -// FlowJobName: s.attachSuffix("test_dynconfig"), -// } - -// config := &protos.FlowConnectionConfigs{ -// FlowJobName: connectionGen.FlowJobName, -// Destination: s.peer, -// TableMappings: []*protos.TableMapping{ -// { -// SourceTableIdentifier: srcTable1Name, -// DestinationTableIdentifier: dstTable1Name, -// }, -// }, -// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), -// CdcStagingPath: connectionGen.CdcStagingPath, -// MaxBatchSize: 6, -// IdleTimeoutSeconds: 7, -// DoInitialSnapshot: true, -// SnapshotNumRowsPerPartition: 1000, -// SnapshotMaxParallelWorkers: 1, -// SnapshotNumTablesInParallel: 1, -// } - -// addRows := func(numRows int) { -// for i := 0; i < numRows; i++ { -// _, err = s.conn.Exec(context.Background(), -// fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name)) -// e2e.EnvNoError(s.t, env, err) -// _, err = s.conn.Exec(context.Background(), -// fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name)) -// e2e.EnvNoError(s.t, env, err) -// } -// s.t.Logf("Inserted %d rows into the source table", numRows) -// } - -// getWorkFlowState := func() peerflow.CDCFlowWorkflowState { -// var workflowState peerflow.CDCFlowWorkflowState -// val, err := env.QueryWorkflow(shared.CDCFlowStateQuery) -// e2e.EnvNoError(s.t, env, err) -// err = val.Get(&workflowState) -// e2e.EnvNoError(s.t, env, err) - -// return workflowState -// } - -// getFlowStatus := func() protos.FlowStatus { -// var flowStatus protos.FlowStatus -// val, err := env.QueryWorkflow(shared.FlowStatusQuery) -// e2e.EnvNoError(s.t, env, err) -// err = val.Get(&flowStatus) -// e2e.EnvNoError(s.t, env, err) - -// return flowStatus -// } - -// // add before to test initial load too. -// addRows(18) -// go func() { -// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) -// // insert 18 rows into the source tables, exactly 3 batches -// addRows(18) - -// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { -// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil -// }) - -// workflowState := getWorkFlowState() -// require.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) -// require.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) -// require.EqualValues(s.t, 1, len(workflowState.TableMappings)) -// require.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping)) -// require.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping)) -// // we have limited batch size to 6, so atleast 3 syncs needed -// require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) - -// // wait for first RegisterDelayedCallback to hit. -// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { -// return sentPause -// }) -// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { -// // keep adding 1 more row - guarantee finishing another sync -// addRows(1) -// flowStatus := getFlowStatus() -// return flowStatus == protos.FlowStatus_STATUS_PAUSED -// }) -// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { -// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil -// }) - -// // we have a paused mirror, wait for second signal to hit. -// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { -// return sentUpdate -// }) - -// // add rows to both tables before resuming - should handle -// addRows(18) - -// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { -// flowStatus := getFlowStatus() -// return flowStatus == protos.FlowStatus_STATUS_RUNNING -// }) -// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { -// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil -// }) -// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { -// return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil -// }) - -// workflowState = getWorkFlowState() -// require.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) -// require.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) -// require.EqualValues(s.t, 2, len(workflowState.TableMappings)) -// require.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping)) -// require.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping)) -// // 3 from first insert of 18 rows in 1 table -// // 1 from pre-pause -// // 3 from second insert of 18 rows in 2 tables, batch size updated -// require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) - -// env.CancelWorkflow() -// }() - -// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) -// } +func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + // needed otherwise errors out + workerOptions := worker.Options{ + EnableSessionWorker: true, + } + env.SetWorkerOptions(workerOptions) + + srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1") + srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2") + dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst") + dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst") + sentPause := false + isPaused := false + sentUpdate := false + + _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + t TEXT DEFAULT md5(random()::text)); + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + t TEXT DEFAULT md5(random()::text)); + `, srcTable1Name, srcTable2Name)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_dynconfig"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTable1Name, + DestinationTableIdentifier: dstTable1Name, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + MaxBatchSize: 6, + IdleTimeoutSeconds: 7, + DoInitialSnapshot: true, + SnapshotNumRowsPerPartition: 1000, + SnapshotMaxParallelWorkers: 1, + SnapshotNumTablesInParallel: 1, + } + + addRows := func(numRows int) { + for i := 0; i < numRows; i++ { + _, err = s.conn.Exec(context.Background(), + fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name)) + e2e.EnvNoError(s.t, env, err) + _, err = s.conn.Exec(context.Background(), + fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name)) + e2e.EnvNoError(s.t, env, err) + } + s.t.Logf("Inserted %d rows into the source table", numRows) + } + + getWorkFlowState := func() peerflow.CDCFlowWorkflowState { + var workflowState peerflow.CDCFlowWorkflowState + val, err := env.QueryWorkflow(shared.CDCFlowStateQuery) + e2e.EnvNoError(s.t, env, err) + err = val.Get(&workflowState) + e2e.EnvNoError(s.t, env, err) + + return workflowState + } + + getFlowStatus := func() protos.FlowStatus { + var flowStatus protos.FlowStatus + val, err := env.QueryWorkflow(shared.FlowStatusQuery) + e2e.EnvNoError(s.t, env, err) + err = val.Get(&flowStatus) + e2e.EnvNoError(s.t, env, err) + + return flowStatus + } + + // signals in tests are weird, you need to register them before starting the workflow + // otherwise you guessed it, errors out. really don't like this. + // too short of a gap between signals also causes issues + // might have something to do with how test workflows handle fast-forwarding time. + env.RegisterDelayedCallback(func() { + env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal) + s.t.Log("Sent pause signal") + sentPause = true + }, 28*time.Second) + // this signal being sent also unblocks another WaitFor + env.RegisterDelayedCallback(func() { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send update signal after pause confirmed", func() bool { + flowStatus := getFlowStatus() + if flowStatus != protos.FlowStatus_STATUS_PAUSED { + return false + } + isPaused = true + env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{ + IdleTimeout: 14, + BatchSize: 12, + AdditionalTables: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTable2Name, + DestinationTableIdentifier: dstTable2Name, + }, + }, + }) + s.t.Log("Sent update signal") + sentUpdate = true + return true + }) + }, 56*time.Second) + env.RegisterDelayedCallback(func() { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool { + if !sentUpdate { + return false + } + env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal) + s.t.Log("Sent resume signal") + return true + }) + }, 84*time.Second) + + // add before to test initial load too. + addRows(18) + go func() { + defer env.CancelWorkflow() + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + // insert 18 rows into the source tables, exactly 3 batches + addRows(18) + + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + + workflowState := getWorkFlowState() + assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) + assert.Len(s.t, workflowState.TableMappings, 1) + assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1) + assert.Len(s.t, workflowState.TableNameSchemaMapping, 1) + // we have limited batch size to 6, so atleast 3 syncs needed + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) + + if !s.t.Failed() { + // wait for first RegisterDelayedCallback to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { + return sentPause + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { + // keep adding 1 more row - guarantee finishing another sync + addRows(1) + // isPaused - set from the WaitFor that sends update signal + return isPaused + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + + // we have a paused mirror, wait for second signal to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { + return sentUpdate + }) + } + + // add rows to both tables before resuming - should handle + addRows(18) + + if !s.t.Failed() { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { + return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { + return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil + }) + } + + if !s.t.Failed() { + workflowState = getWorkFlowState() + assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) + assert.Len(s.t, workflowState.TableMappings, 2) + assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2) + assert.Len(s.t, workflowState.TableNameSchemaMapping, 2) + // 3 from first insert of 18 rows in 1 table + // 1 from pre-pause + // 3 from second insert of 18 rows in 2 tables, batch size updated + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + } + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) + e2e.RequireEnvCanceled(s.t, env) +}