diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 777b23039c..3e3a4a3c77 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -126,4 +126,3 @@ jobs: PEERDB_CATALOG_USER: postgres PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres - PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10 diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index c8074484ad..d17e4c6b26 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -228,8 +228,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -270,8 +269,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -315,8 +313,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -381,8 +378,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -448,8 +444,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -511,8 +506,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 11, @@ -583,8 +577,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 6, @@ -650,8 +643,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -722,8 +714,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -797,8 +788,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -855,8 +845,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -936,8 +925,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -996,8 +984,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -1097,8 +1084,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -1169,8 +1155,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -1244,8 +1229,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -1311,8 +1295,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { SoftDelete: true, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -1376,8 +1359,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index df1ff17c13..0e1789fff6 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -187,7 +187,7 @@ func GenerateSnowflakePeer(snowflakeConfig *protos.SnowflakeConfig) (*protos.Pee return ret, nil } -func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*protos.FlowConnectionConfigs, error) { +func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos.FlowConnectionConfigs { tblMappings := []*protos.TableMapping{} for k, v := range c.TableNameMapping { tblMappings = append(tblMappings, &protos.TableMapping{ @@ -207,7 +207,8 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto ret.SoftDeleteColName = "_PEERDB_IS_DELETED" } ret.SyncedAtColName = "_PEERDB_SYNCED_AT" - return ret, nil + ret.IdleTimeoutSeconds = 10 + return ret } type QRepFlowConnectionGenerationConfig struct { diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 130c860c4c..704150731d 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -69,8 +69,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -134,8 +133,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -186,8 +184,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -348,7 +345,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ @@ -425,7 +422,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ @@ -504,7 +501,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ @@ -573,7 +570,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { SoftDelete: true, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 4c5d8feb94..415e1fdcbb 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -40,8 +40,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { Destination: s.s3Helper.GetPeer(), } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 7ffa8763c9..b6fcdd50f8 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -152,8 +152,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -228,8 +227,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -289,8 +287,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -370,8 +367,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -437,8 +433,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -498,8 +493,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 11, @@ -569,8 +563,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 6, @@ -635,8 +628,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -706,8 +698,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -785,8 +776,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -842,8 +832,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -1032,8 +1021,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -1104,8 +1092,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -1178,8 +1165,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -1660,8 +1646,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20,