Skip to content

Commit

Permalink
fixing review comments pt.1
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 8, 2024
1 parent 75575d5 commit fd12c95
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"
Expand Down Expand Up @@ -1241,13 +1242,13 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
})

workflowState := getWorkFlowState()
require.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
require.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
require.EqualValues(s.t, 1, len(workflowState.TableMappings))
require.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping))
require.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping))
assert.EqualValues(s.t, 5, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.EqualValues(s.t, 1, len(workflowState.TableMappings))
assert.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping))
assert.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping))
// we have limited batch size to 6, so atleast 3 syncs needed
require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

// wait for first RegisterDelayedCallback to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
Expand All @@ -1272,8 +1273,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
addRows(18)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
flowStatus := getFlowStatus()
return flowStatus == protos.FlowStatus_STATUS_RUNNING
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
Expand All @@ -1283,18 +1283,19 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
})

workflowState = getWorkFlowState()
require.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
require.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
require.EqualValues(s.t, 2, len(workflowState.TableMappings))
require.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping))
require.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping))
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.EqualValues(s.t, 2, len(workflowState.TableMappings))
assert.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping))
assert.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping))
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
e2e.RequireEnvCanceled(s.t, env)
}

0 comments on commit fd12c95

Please sign in to comment.