Skip to content

Commit

Permalink
Remove error from GenerateFlowConnectionConfigs return type (#1035)
Browse files Browse the repository at this point in the history
Also move PEERDB_CDC_IDLE_TIMEOUT_SECONDS to being specified by test
  • Loading branch information
serprex authored Jan 9, 2024
1 parent c46636c commit 2306019
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 81 deletions.
1 change: 0 additions & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,3 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
54 changes: 18 additions & 36 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 0,
Expand Down Expand Up @@ -270,8 +269,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 0,
Expand Down Expand Up @@ -315,8 +313,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
Expand Down Expand Up @@ -381,8 +378,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 4,
Expand Down Expand Up @@ -448,8 +444,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 0,
Expand Down Expand Up @@ -511,8 +506,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 11,
Expand Down Expand Up @@ -583,8 +577,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 6,
Expand Down Expand Up @@ -650,8 +643,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 4,
Expand Down Expand Up @@ -722,8 +714,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 1,
Expand Down Expand Up @@ -797,8 +788,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 1,
Expand Down Expand Up @@ -855,8 +845,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
Expand Down Expand Up @@ -936,8 +925,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 2,
Expand Down Expand Up @@ -996,8 +984,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 1,
Expand Down Expand Up @@ -1097,8 +1084,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
Expand Down Expand Up @@ -1169,8 +1155,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 20,
Expand Down Expand Up @@ -1244,8 +1229,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
Expand Down Expand Up @@ -1311,8 +1295,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
SoftDelete: true,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 2,
Expand Down Expand Up @@ -1376,8 +1359,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 2,
Expand Down
5 changes: 3 additions & 2 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func GenerateSnowflakePeer(snowflakeConfig *protos.SnowflakeConfig) (*protos.Pee
return ret, nil
}

func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*protos.FlowConnectionConfigs, error) {
func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos.FlowConnectionConfigs {
tblMappings := []*protos.TableMapping{}
for k, v := range c.TableNameMapping {
tblMappings = append(tblMappings, &protos.TableMapping{
Expand All @@ -207,7 +207,8 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto
ret.SoftDeleteColName = "_PEERDB_IS_DELETED"
}
ret.SyncedAtColName = "_PEERDB_SYNCED_AT"
return ret, nil
ret.IdleTimeoutSeconds = 10
return ret
}

type QRepFlowConnectionGenerationConfig struct {
Expand Down
17 changes: 7 additions & 10 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
Expand Down Expand Up @@ -134,8 +133,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 1,
Expand Down Expand Up @@ -186,8 +184,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 1,
Expand Down Expand Up @@ -348,7 +345,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
Expand Down Expand Up @@ -425,7 +422,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
Expand Down Expand Up @@ -504,7 +501,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
Expand Down Expand Up @@ -573,7 +570,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
SoftDelete: true,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
Expand Down
3 changes: 1 addition & 2 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
Destination: s.s3Helper.GetPeer(),
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
Expand Down
Loading

0 comments on commit 2306019

Please sign in to comment.