From 79915cfc567fa0cb8d9983feb65a814334838a57 Mon Sep 17 00:00:00 2001
From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com>
Date: Tue, 13 Feb 2024 00:38:38 +0530
Subject: [PATCH] dynamic properties test (#1216)

also ends up testing initial load
---
 flow/e2e/postgres/peer_flow_pg_test.go | 378 +++++++++++++------------
 1 file changed, 201 insertions(+), 177 deletions(-)

diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go
index abc23c4a8..642bc13a9 100644
--- a/flow/e2e/postgres/peer_flow_pg_test.go
+++ b/flow/e2e/postgres/peer_flow_pg_test.go
@@ -10,13 +10,16 @@ import (
 	"github.com/jackc/pgerrcode"
 	"github.com/jackc/pgx/v5/pgconn"
 	"github.com/jackc/pgx/v5/pgtype"
+	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	"go.temporal.io/sdk/testsuite"
+	"go.temporal.io/sdk/worker"
 
 	"github.com/PeerDB-io/peer-flow/connectors/utils"
 	"github.com/PeerDB-io/peer-flow/e2e"
 	"github.com/PeerDB-io/peer-flow/generated/protos"
 	"github.com/PeerDB-io/peer-flow/model/qvalue"
+	"github.com/PeerDB-io/peer-flow/shared"
 	peerflow "github.com/PeerDB-io/peer-flow/workflows"
 )
 
@@ -1103,180 +1106,201 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() {
 
 // test don't work, make it work later
 
-// func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
-// 	env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
-// 	// needed otherwise errors out
-// 	workerOptions := worker.Options{
-// 		EnableSessionWorker: true,
-// 	}
-// 	env.SetWorkerOptions(workerOptions)
-
-// 	srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1")
-// 	srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2")
-// 	dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst")
-// 	dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst")
-// 	sentPause := false
-// 	sentUpdate := false
-
-// 	// signals in tests are weird, you need to register them before starting the workflow
-// 	// otherwise you guessed it, errors out. really don't like this.
-// 	// too short of a gap between signals also causes issues
-// 	// might have something to do with how test workflows handle fast-forwarding time.
-// 	env.RegisterDelayedCallback(func() {
-// 		env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal)
-// 		s.t.Log("Sent pause signal")
-// 		sentPause = true
-// 	}, 28*time.Second)
-// 	env.RegisterDelayedCallback(func() {
-// 		env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{
-// 			IdleTimeout: 14,
-// 			BatchSize:   12,
-// 			AdditionalTables: []*protos.TableMapping{
-// 				{
-// 					SourceTableIdentifier:      srcTable2Name,
-// 					DestinationTableIdentifier: dstTable2Name,
-// 				},
-// 			},
-// 		})
-// 		s.t.Log("Sent update signal")
-// 		sentUpdate = true
-// 	}, 56*time.Second)
-// 	env.RegisterDelayedCallback(func() {
-// 		env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal)
-// 		s.t.Log("Sent resume signal")
-// 	}, 84*time.Second)
-
-// 	_, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
-// 		CREATE TABLE IF NOT EXISTS %s (
-// 			id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
-// 			t TEXT DEFAULT md5(random()::text));
-// 		CREATE TABLE IF NOT EXISTS %s (
-// 			id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
-// 			t TEXT DEFAULT md5(random()::text));
-// 	`, srcTable1Name, srcTable2Name))
-// 	require.NoError(s.t, err)
-
-// 	connectionGen := e2e.FlowConnectionGenerationConfig{
-// 		FlowJobName: s.attachSuffix("test_dynconfig"),
-// 	}
-
-// 	config := &protos.FlowConnectionConfigs{
-// 		FlowJobName: connectionGen.FlowJobName,
-// 		Destination: s.peer,
-// 		TableMappings: []*protos.TableMapping{
-// 			{
-// 				SourceTableIdentifier:      srcTable1Name,
-// 				DestinationTableIdentifier: dstTable1Name,
-// 			},
-// 		},
-// 		Source:                      e2e.GeneratePostgresPeer(e2e.PostgresPort),
-// 		CdcStagingPath:              connectionGen.CdcStagingPath,
-// 		MaxBatchSize:                6,
-// 		IdleTimeoutSeconds:          7,
-// 		DoInitialSnapshot:           true,
-// 		SnapshotNumRowsPerPartition: 1000,
-// 		SnapshotMaxParallelWorkers:  1,
-// 		SnapshotNumTablesInParallel: 1,
-// 	}
-
-// 	addRows := func(numRows int) {
-// 		for i := 0; i < numRows; i++ {
-// 			_, err = s.conn.Exec(context.Background(),
-// 				fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name))
-// 			e2e.EnvNoError(s.t, env, err)
-// 			_, err = s.conn.Exec(context.Background(),
-// 				fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name))
-// 			e2e.EnvNoError(s.t, env, err)
-// 		}
-// 		s.t.Logf("Inserted %d rows into the source table", numRows)
-// 	}
-
-// 	getWorkFlowState := func() peerflow.CDCFlowWorkflowState {
-// 		var workflowState peerflow.CDCFlowWorkflowState
-// 		val, err := env.QueryWorkflow(shared.CDCFlowStateQuery)
-// 		e2e.EnvNoError(s.t, env, err)
-// 		err = val.Get(&workflowState)
-// 		e2e.EnvNoError(s.t, env, err)
-
-// 		return workflowState
-// 	}
-
-// 	getFlowStatus := func() protos.FlowStatus {
-// 		var flowStatus protos.FlowStatus
-// 		val, err := env.QueryWorkflow(shared.FlowStatusQuery)
-// 		e2e.EnvNoError(s.t, env, err)
-// 		err = val.Get(&flowStatus)
-// 		e2e.EnvNoError(s.t, env, err)
-
-// 		return flowStatus
-// 	}
-
-// 	// add before to test initial load too.
-// 	addRows(18)
-// 	go func() {
-// 		e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
-// 		// insert 18 rows into the source tables, exactly 3 batches
-// 		addRows(18)
-
-// 		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
-// 			return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
-// 		})
-
-// 		workflowState := getWorkFlowState()
-// 		require.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
-// 		require.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
-// 		require.EqualValues(s.t, 1, len(workflowState.TableMappings))
-// 		require.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping))
-// 		require.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping))
-// 		// we have limited batch size to 6, so atleast 3 syncs needed
-// 		require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)
-
-// 		// wait for first RegisterDelayedCallback to hit.
-// 		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
-// 			return sentPause
-// 		})
-// 		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
-// 			// keep adding 1 more row - guarantee finishing another sync
-// 			addRows(1)
-// 			flowStatus := getFlowStatus()
-// 			return flowStatus == protos.FlowStatus_STATUS_PAUSED
-// 		})
-// 		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
-// 			return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
-// 		})
-
-// 		// we have a paused mirror, wait for second signal to hit.
-// 		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
-// 			return sentUpdate
-// 		})
-
-// 		// add rows to both tables before resuming - should handle
-// 		addRows(18)
-
-// 		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
-// 			flowStatus := getFlowStatus()
-// 			return flowStatus == protos.FlowStatus_STATUS_RUNNING
-// 		})
-// 		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
-// 			return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
-// 		})
-// 		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
-// 			return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
-// 		})
-
-// 		workflowState = getWorkFlowState()
-// 		require.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
-// 		require.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
-// 		require.EqualValues(s.t, 2, len(workflowState.TableMappings))
-// 		require.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping))
-// 		require.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping))
-// 		// 3 from first insert of 18 rows in 1 table
-// 		// 1 from pre-pause
-// 		// 3 from second insert of 18 rows in 2 tables, batch size updated
-// 		require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
-
-// 		env.CancelWorkflow()
-// 	}()
-
-// 	env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
-// }
+func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
+	env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
+	// needed otherwise errors out
+	workerOptions := worker.Options{
+		EnableSessionWorker: true,
+	}
+	env.SetWorkerOptions(workerOptions)
+
+	srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1")
+	srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2")
+	dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst")
+	dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst")
+	sentPause := false
+	isPaused := false
+	sentUpdate := false
+
+	_, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
+		CREATE TABLE IF NOT EXISTS %s (
+			id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
+			t TEXT DEFAULT md5(random()::text));
+		CREATE TABLE IF NOT EXISTS %s (
+			id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
+			t TEXT DEFAULT md5(random()::text));
+	`, srcTable1Name, srcTable2Name))
+	require.NoError(s.t, err)
+
+	connectionGen := e2e.FlowConnectionGenerationConfig{
+		FlowJobName: s.attachSuffix("test_dynconfig"),
+	}
+
+	config := &protos.FlowConnectionConfigs{
+		FlowJobName: connectionGen.FlowJobName,
+		Destination: s.peer,
+		TableMappings: []*protos.TableMapping{
+			{
+				SourceTableIdentifier:      srcTable1Name,
+				DestinationTableIdentifier: dstTable1Name,
+			},
+		},
+		Source:                      e2e.GeneratePostgresPeer(e2e.PostgresPort),
+		CdcStagingPath:              connectionGen.CdcStagingPath,
+		MaxBatchSize:                6,
+		IdleTimeoutSeconds:          7,
+		DoInitialSnapshot:           true,
+		SnapshotNumRowsPerPartition: 1000,
+		SnapshotMaxParallelWorkers:  1,
+		SnapshotNumTablesInParallel: 1,
+	}
+
+	addRows := func(numRows int) {
+		for i := 0; i < numRows; i++ {
+			_, err = s.conn.Exec(context.Background(),
+				fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name))
+			e2e.EnvNoError(s.t, env, err)
+			_, err = s.conn.Exec(context.Background(),
+				fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name))
+			e2e.EnvNoError(s.t, env, err)
+		}
+		s.t.Logf("Inserted %d rows into the source table", numRows)
+	}
+
+	getWorkFlowState := func() peerflow.CDCFlowWorkflowState {
+		var workflowState peerflow.CDCFlowWorkflowState
+		val, err := env.QueryWorkflow(shared.CDCFlowStateQuery)
+		e2e.EnvNoError(s.t, env, err)
+		err = val.Get(&workflowState)
+		e2e.EnvNoError(s.t, env, err)
+
+		return workflowState
+	}
+
+	getFlowStatus := func() protos.FlowStatus {
+		var flowStatus protos.FlowStatus
+		val, err := env.QueryWorkflow(shared.FlowStatusQuery)
+		e2e.EnvNoError(s.t, env, err)
+		err = val.Get(&flowStatus)
+		e2e.EnvNoError(s.t, env, err)
+
+		return flowStatus
+	}
+
+	// signals in tests are weird, you need to register them before starting the workflow
+	// otherwise you guessed it, errors out. really don't like this.
+	// too short of a gap between signals also causes issues
+	// might have something to do with how test workflows handle fast-forwarding time.
+	env.RegisterDelayedCallback(func() {
+		env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal)
+		s.t.Log("Sent pause signal")
+		sentPause = true
+	}, 28*time.Second)
+	// this signal being sent also unblocks another WaitFor
+	env.RegisterDelayedCallback(func() {
+		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send update signal after pause confirmed", func() bool {
+			flowStatus := getFlowStatus()
+			if flowStatus != protos.FlowStatus_STATUS_PAUSED {
+				return false
+			}
+			isPaused = true
+			env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{
+				IdleTimeout: 14,
+				BatchSize:   12,
+				AdditionalTables: []*protos.TableMapping{
+					{
+						SourceTableIdentifier:      srcTable2Name,
+						DestinationTableIdentifier: dstTable2Name,
+					},
+				},
+			})
+			s.t.Log("Sent update signal")
+			sentUpdate = true
+			return true
+		})
+	}, 56*time.Second)
+	env.RegisterDelayedCallback(func() {
+		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool {
+			if !sentUpdate {
+				return false
+			}
+			env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal)
+			s.t.Log("Sent resume signal")
+			return true
+		})
+	}, 84*time.Second)
+
+	// add before to test initial load too.
+	addRows(18)
+	go func() {
+		defer env.CancelWorkflow()
+		e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
+		// insert 18 rows into the source tables, exactly 3 batches
+		addRows(18)
+
+		e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
+			return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
+		})
+
+		workflowState := getWorkFlowState()
+		assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
+		assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
+		assert.Len(s.t, workflowState.TableMappings, 1)
+		assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1)
+		assert.Len(s.t, workflowState.TableNameSchemaMapping, 1)
+		// we have limited batch size to 6, so atleast 3 syncs needed
+		assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)
+
+		if !s.t.Failed() {
+			// wait for first RegisterDelayedCallback to hit.
+			e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
+				return sentPause
+			})
+			e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
+				// keep adding 1 more row - guarantee finishing another sync
+				addRows(1)
+				// isPaused - set from the WaitFor that sends update signal
+				return isPaused
+			})
+			e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
+				return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
+			})
+
+			// we have a paused mirror, wait for second signal to hit.
+			e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
+				return sentUpdate
+			})
+		}
+
+		// add rows to both tables before resuming - should handle
+		addRows(18)
+
+		if !s.t.Failed() {
+			e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
+				return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
+			})
+			e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
+				return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
+			})
+			e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
+				return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
+			})
+		}
+
+		if !s.t.Failed() {
+			workflowState = getWorkFlowState()
+			assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
+			assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
+			assert.Len(s.t, workflowState.TableMappings, 2)
+			assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2)
+			assert.Len(s.t, workflowState.TableNameSchemaMapping, 2)
+			// 3 from first insert of 18 rows in 1 table
+			// 1 from pre-pause
+			// 3 from second insert of 18 rows in 2 tables, batch size updated
+			assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
+		}
+	}()
+
+	env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
+	e2e.RequireEnvCanceled(s.t, env)
+}