Skip to content

Commit

Permalink
dynamic properties test (#1216)
Browse files Browse the repository at this point in the history
also ends up testing initial load
  • Loading branch information
heavycrystal authored Feb 12, 2024
1 parent e5eb347 commit 79915cf
Showing 1 changed file with 201 additions and 177 deletions.
378 changes: 201 additions & 177 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

Expand Down Expand Up @@ -1103,180 +1106,201 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() {

// test don't work, make it work later

// func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
// env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
// // needed otherwise errors out
// workerOptions := worker.Options{
// EnableSessionWorker: true,
// }
// env.SetWorkerOptions(workerOptions)

// srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1")
// srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2")
// dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst")
// dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst")
// sentPause := false
// sentUpdate := false

// // signals in tests are weird, you need to register them before starting the workflow
// // otherwise you guessed it, errors out. really don't like this.
// // too short of a gap between signals also causes issues
// // might have something to do with how test workflows handle fast-forwarding time.
// env.RegisterDelayedCallback(func() {
// env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal)
// s.t.Log("Sent pause signal")
// sentPause = true
// }, 28*time.Second)
// env.RegisterDelayedCallback(func() {
// env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{
// IdleTimeout: 14,
// BatchSize: 12,
// AdditionalTables: []*protos.TableMapping{
// {
// SourceTableIdentifier: srcTable2Name,
// DestinationTableIdentifier: dstTable2Name,
// },
// },
// })
// s.t.Log("Sent update signal")
// sentUpdate = true
// }, 56*time.Second)
// env.RegisterDelayedCallback(func() {
// env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal)
// s.t.Log("Sent resume signal")
// }, 84*time.Second)

// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
// CREATE TABLE IF NOT EXISTS %s (
// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
// t TEXT DEFAULT md5(random()::text));
// CREATE TABLE IF NOT EXISTS %s (
// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
// t TEXT DEFAULT md5(random()::text));
// `, srcTable1Name, srcTable2Name))
// require.NoError(s.t, err)

// connectionGen := e2e.FlowConnectionGenerationConfig{
// FlowJobName: s.attachSuffix("test_dynconfig"),
// }

// config := &protos.FlowConnectionConfigs{
// FlowJobName: connectionGen.FlowJobName,
// Destination: s.peer,
// TableMappings: []*protos.TableMapping{
// {
// SourceTableIdentifier: srcTable1Name,
// DestinationTableIdentifier: dstTable1Name,
// },
// },
// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
// CdcStagingPath: connectionGen.CdcStagingPath,
// MaxBatchSize: 6,
// IdleTimeoutSeconds: 7,
// DoInitialSnapshot: true,
// SnapshotNumRowsPerPartition: 1000,
// SnapshotMaxParallelWorkers: 1,
// SnapshotNumTablesInParallel: 1,
// }

// addRows := func(numRows int) {
// for i := 0; i < numRows; i++ {
// _, err = s.conn.Exec(context.Background(),
// fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name))
// e2e.EnvNoError(s.t, env, err)
// _, err = s.conn.Exec(context.Background(),
// fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name))
// e2e.EnvNoError(s.t, env, err)
// }
// s.t.Logf("Inserted %d rows into the source table", numRows)
// }

// getWorkFlowState := func() peerflow.CDCFlowWorkflowState {
// var workflowState peerflow.CDCFlowWorkflowState
// val, err := env.QueryWorkflow(shared.CDCFlowStateQuery)
// e2e.EnvNoError(s.t, env, err)
// err = val.Get(&workflowState)
// e2e.EnvNoError(s.t, env, err)

// return workflowState
// }

// getFlowStatus := func() protos.FlowStatus {
// var flowStatus protos.FlowStatus
// val, err := env.QueryWorkflow(shared.FlowStatusQuery)
// e2e.EnvNoError(s.t, env, err)
// err = val.Get(&flowStatus)
// e2e.EnvNoError(s.t, env, err)

// return flowStatus
// }

// // add before to test initial load too.
// addRows(18)
// go func() {
// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// // insert 18 rows into the source tables, exactly 3 batches
// addRows(18)

// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
// })

// workflowState := getWorkFlowState()
// require.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
// require.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
// require.EqualValues(s.t, 1, len(workflowState.TableMappings))
// require.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping))
// require.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping))
// // we have limited batch size to 6, so atleast 3 syncs needed
// require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

// // wait for first RegisterDelayedCallback to hit.
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
// return sentPause
// })
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// // keep adding 1 more row - guarantee finishing another sync
// addRows(1)
// flowStatus := getFlowStatus()
// return flowStatus == protos.FlowStatus_STATUS_PAUSED
// })
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
// })

// // we have a paused mirror, wait for second signal to hit.
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
// return sentUpdate
// })

// // add rows to both tables before resuming - should handle
// addRows(18)

// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
// flowStatus := getFlowStatus()
// return flowStatus == protos.FlowStatus_STATUS_RUNNING
// })
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
// })
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
// return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
// })

// workflowState = getWorkFlowState()
// require.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
// require.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
// require.EqualValues(s.t, 2, len(workflowState.TableMappings))
// require.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping))
// require.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping))
// // 3 from first insert of 18 rows in 1 table
// // 1 from pre-pause
// // 3 from second insert of 18 rows in 2 tables, batch size updated
// require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)

// env.CancelWorkflow()
// }()

// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
// }
func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
// needed otherwise errors out
workerOptions := worker.Options{
EnableSessionWorker: true,
}
env.SetWorkerOptions(workerOptions)

srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1")
srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2")
dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst")
dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst")
sentPause := false
isPaused := false
sentUpdate := false

_, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
t TEXT DEFAULT md5(random()::text));
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
t TEXT DEFAULT md5(random()::text));
`, srcTable1Name, srcTable2Name))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_dynconfig"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTable1Name,
DestinationTableIdentifier: dstTable1Name,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
MaxBatchSize: 6,
IdleTimeoutSeconds: 7,
DoInitialSnapshot: true,
SnapshotNumRowsPerPartition: 1000,
SnapshotMaxParallelWorkers: 1,
SnapshotNumTablesInParallel: 1,
}

addRows := func(numRows int) {
for i := 0; i < numRows; i++ {
_, err = s.conn.Exec(context.Background(),
fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name))
e2e.EnvNoError(s.t, env, err)
_, err = s.conn.Exec(context.Background(),
fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name))
e2e.EnvNoError(s.t, env, err)
}
s.t.Logf("Inserted %d rows into the source table", numRows)
}

getWorkFlowState := func() peerflow.CDCFlowWorkflowState {
var workflowState peerflow.CDCFlowWorkflowState
val, err := env.QueryWorkflow(shared.CDCFlowStateQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&workflowState)
e2e.EnvNoError(s.t, env, err)

return workflowState
}

getFlowStatus := func() protos.FlowStatus {
var flowStatus protos.FlowStatus
val, err := env.QueryWorkflow(shared.FlowStatusQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&flowStatus)
e2e.EnvNoError(s.t, env, err)

return flowStatus
}

// signals in tests are weird, you need to register them before starting the workflow
// otherwise you guessed it, errors out. really don't like this.
// too short of a gap between signals also causes issues
// might have something to do with how test workflows handle fast-forwarding time.
env.RegisterDelayedCallback(func() {
env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal)
s.t.Log("Sent pause signal")
sentPause = true
}, 28*time.Second)
// this signal being sent also unblocks another WaitFor
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send update signal after pause confirmed", func() bool {
flowStatus := getFlowStatus()
if flowStatus != protos.FlowStatus_STATUS_PAUSED {
return false
}
isPaused = true
env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{
IdleTimeout: 14,
BatchSize: 12,
AdditionalTables: []*protos.TableMapping{
{
SourceTableIdentifier: srcTable2Name,
DestinationTableIdentifier: dstTable2Name,
},
},
})
s.t.Log("Sent update signal")
sentUpdate = true
return true
})
}, 56*time.Second)
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool {
if !sentUpdate {
return false
}
env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal)
s.t.Log("Sent resume signal")
return true
})
}, 84*time.Second)

// add before to test initial load too.
addRows(18)
go func() {
defer env.CancelWorkflow()
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 18 rows into the source tables, exactly 3 batches
addRows(18)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

workflowState := getWorkFlowState()
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 1)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 1)
// we have limited batch size to 6, so atleast 3 syncs needed
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

if !s.t.Failed() {
// wait for first RegisterDelayedCallback to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
return sentPause
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// keep adding 1 more row - guarantee finishing another sync
addRows(1)
// isPaused - set from the WaitFor that sends update signal
return isPaused
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

// we have a paused mirror, wait for second signal to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
return sentUpdate
})
}

// add rows to both tables before resuming - should handle
addRows(18)

if !s.t.Failed() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
})
}

if !s.t.Failed() {
workflowState = getWorkFlowState()
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 2)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 2)
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
}
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
e2e.RequireEnvCanceled(s.t, env)
}

0 comments on commit 79915cf

Please sign in to comment.