diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 00f608857c..5f966ecf7f 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -966,7 +966,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( } // convert the column names and types to bigquery types - columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns)) + columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns), len(tableSchema.Columns)+2) idx := 0 for colName, genericColType := range tableSchema.Columns { columns[idx] = &bigquery.FieldSchema{ diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 0a79810530..9aa05131c7 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -352,7 +352,7 @@ func generateCreateTableSQLForNormalizedTable( softDeleteColName string, syncedAtColName string, ) string { - createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)) + createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)+2) for columnName, genericColumnType := range sourceTableSchema.Columns { createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("\"%s\" %s,", columnName, qValueKindToPostgresType(genericColumnType))) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index d996657d5d..f92ed3e33e 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -751,7 +751,7 @@ func generateCreateTableSQLForNormalizedTable( softDeleteColName string, syncedAtColName string, ) string { - createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)) + createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)+2) for columnName, genericColumnType := range sourceTableSchema.Columns { columnNameUpper := strings.ToUpper(columnName) sfColType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType)) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 0473f26945..30e203aeba 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -52,9 +52,10 @@ func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.bqSuffix) } -func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { +func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, rowID int8) error { + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, dstQualified) query := fmt.Sprintf("SELECT `_PEERDB_IS_DELETED`,`_PEERDB_SYNCED_AT` FROM %s WHERE id = %d", - dstSchemaQualified, rowID) + qualifiedTableName, rowID) recordBatch, err := s.bqHelper.ExecuteAndProcessQuery(query) if err != nil { @@ -456,7 +457,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsBQ(dstTableName, "id,t1,t2,k,_PEERDB_IS_DELETED, _PEERDB_SYNCED_AT") + s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") env.AssertExpectations(s.t) <-done } @@ -1133,13 +1134,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() { +func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") - dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") - + dstTableName := "test_peerdb_cols_dst" _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, @@ -1154,6 +1154,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + SoftDelete: true, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1179,7 +1180,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() { DELETE FROM %s WHERE id=1 `, srcTableName)) require.NoError(s.t, err) - fmt.Println("Inserted 10 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1189,7 +1189,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) require.Contains(s.t, err.Error(), "continue as new") err = s.checkPeerdbColumns(dstTableName, 1) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index b71ad726a9..e881dd5ead 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -171,6 +171,7 @@ type FlowConnectionGenerationConfig struct { PostgresPort int Destination *protos.Peer CdcStagingPath string + SoftDelete bool } // GenerateSnowflakePeer generates a snowflake peer config for testing. @@ -201,8 +202,10 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto ret.Source = GeneratePostgresPeer(c.PostgresPort) ret.Destination = c.Destination ret.CdcStagingPath = c.CdcStagingPath - ret.SoftDelete = true - ret.SoftDeleteColName = "_PEERDB_IS_DELETED" + ret.SoftDelete = c.SoftDelete + if ret.SoftDelete { + ret.SoftDeleteColName = "_PEERDB_IS_DELETED" + } ret.SyncedAtColName = "_PEERDB_SYNCED_AT" return ret, nil } diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index f805fc9eba..da050ccf64 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -518,6 +518,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.peer, + SoftDelete: true, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 3e6f0c2bc0..e5bd22464c 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1176,8 +1176,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { Exclude: []string{"c2"}, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), - CdcStagingPath: connectionGen.CdcStagingPath, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SyncedAtColName: "_PEERDB_SYNCED_AT", } limits := peerflow.CDCFlowLimits{ @@ -1221,7 +1222,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { for _, field := range sfRows.Schema.Fields { require.NotEqual(s.t, field.Name, "c2") } - s.Equal(4, len(sfRows.Schema.Fields)) + s.Equal(5, len(sfRows.Schema.Fields)) s.Equal(10, len(sfRows.Records)) }