diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index db7b4c83af..55e6858b35 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -177,11 +177,11 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) } -func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string) []string { +func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastCols []string) []string { // TODO (kaushik): This is so that the statement size for individual merge statements // doesn't exceed the limit. We should make this configurable. const batchSize = 8 - partitions := utils.ArrayChunks(allUnchangedToastColas, batchSize) + partitions := utils.ArrayChunks(allUnchangedToastCols, batchSize) mergeStmts := make([]string, 0, len(partitions)) for _, partition := range partitions { diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 3efe19c48f..5517aa406f 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -1233,6 +1233,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { // add before to test initial load too. addRows(18) go func() { + defer env.CancelWorkflow() e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 18 rows into the source tables, exactly 3 batches addRows(18) @@ -1244,56 +1245,60 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { workflowState := getWorkFlowState() assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) - assert.EqualValues(s.t, 1, len(workflowState.TableMappings)) - assert.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping)) - assert.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping)) + assert.Len(s.t, workflowState.TableMappings, 2) + assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1) + assert.Len(s.t, workflowState.TableNameSchemaMapping, 1) // we have limited batch size to 6, so atleast 3 syncs needed assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) - // wait for first RegisterDelayedCallback to hit. - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { - return sentPause - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { - // keep adding 1 more row - guarantee finishing another sync - addRows(1) - // isPaused - set from the WaitFor that sends update signal - return isPaused - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { - return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil - }) + if !s.t.Failed() { + // wait for first RegisterDelayedCallback to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { + return sentPause + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { + // keep adding 1 more row - guarantee finishing another sync + addRows(1) + // isPaused - set from the WaitFor that sends update signal + return isPaused + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) - // we have a paused mirror, wait for second signal to hit. - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { - return sentUpdate - }) + // we have a paused mirror, wait for second signal to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { + return sentUpdate + }) + } // add rows to both tables before resuming - should handle addRows(18) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { - return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { - return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { - return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil - }) - - workflowState = getWorkFlowState() - assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) - assert.EqualValues(s.t, 2, len(workflowState.TableMappings)) - assert.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping)) - assert.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping)) - // 3 from first insert of 18 rows in 1 table - // 1 from pre-pause - // 3 from second insert of 18 rows in 2 tables, batch size updated - assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + if !s.t.Failed() { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { + return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { + return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil + }) + } - env.CancelWorkflow() + if !s.t.Failed() { + workflowState = getWorkFlowState() + assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) + assert.Len(s.t, workflowState.TableMappings, 2) + assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2) + assert.Len(s.t, workflowState.TableNameSchemaMapping, 2) + // 3 from first insert of 18 rows in 1 table + // 1 from pre-pause + // 3 from second insert of 18 rows in 2 tables, batch size updated + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + } }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)