diff --git a/flow/.golangci.yml b/flow/.golangci.yml index 16b2a24619..f8a051b2c6 100644 --- a/flow/.golangci.yml +++ b/flow/.golangci.yml @@ -6,6 +6,7 @@ linters: - dogsled - durationcheck - errcheck + - forbidigo - gofumpt - gosec - gosimple diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3245f84a47..cc8ebfb344 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -471,7 +471,6 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync break } if err != nil { - fmt.Printf("Error while iterating through results: %v\n", err) return nil, err } resultMap[row.Tablename] = row.UnchangedToastColumns diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index 2339e93c71..73278ce531 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -104,9 +104,6 @@ func TestGetQRepPartitions(t *testing.T) { // from 2010 Jan 1 10:00 AM UTC to 2010 Jan 30 10:00 AM UTC numRows := prepareTestData(t, pool.Pool, schemaName) - secondsInADay := uint32(24 * time.Hour / time.Second) - fmt.Printf("secondsInADay: %d\n", secondsInADay) - // Define the test cases testCases := []*testCase{ newTestCaseForNumRows( diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 0d0cdda6a1..94d66ef416 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -145,7 +145,7 @@ func TestWriteRecordsToAvroFileHappyPath(t *testing.T) { avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) require.NoError(t, err) - fmt.Printf("[test] avroSchema: %v\n", avroSchema) + t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(context.Background(), @@ -173,7 +173,7 @@ func TestWriteRecordsToZstdAvroFileHappyPath(t *testing.T) { avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) require.NoError(t, err) - fmt.Printf("[test] avroSchema: %v\n", avroSchema) + t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(context.Background(), @@ -201,7 +201,7 @@ func TestWriteRecordsToDeflateAvroFileHappyPath(t *testing.T) { avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) require.NoError(t, err) - fmt.Printf("[test] avroSchema: %v\n", avroSchema) + t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(context.Background(), @@ -228,7 +228,7 @@ func TestWriteRecordsToAvroFileNonNull(t *testing.T) { avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) require.NoError(t, err) - fmt.Printf("[test] avroSchema: %v\n", avroSchema) + t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(context.Background(), @@ -256,7 +256,7 @@ func TestWriteRecordsToAvroFileAllNulls(t *testing.T) { avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) require.NoError(t, err) - fmt.Printf("[test] avroSchema: %v\n", avroSchema) + t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(context.Background(), diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index af7d375d79..e8627a4e5b 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -114,7 +114,6 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem } command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", schemaName, tableName, strings.Join(fields, ", ")) - fmt.Printf("creating table %s.%s with command %s\n", schemaName, tableName, command) _, err := g.db.ExecContext(g.ctx, command) if err != nil { diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 0cf697020b..ecc2cce1e8 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -100,7 +100,6 @@ func (b *BigQueryTestHelper) datasetExists(datasetName string) (bool, error) { if err != nil { // if err message contains `notFound` then dataset does not exist. if strings.Contains(err.Error(), "notFound") { - fmt.Printf("dataset %s does not exist\n", b.Config.DatasetId) return false, nil } @@ -134,7 +133,6 @@ func (b *BigQueryTestHelper) RecreateDataset() error { return fmt.Errorf("failed to create dataset: %w", err) } - fmt.Printf("created dataset %s successfully\n", b.datasetName) return nil } @@ -305,7 +303,6 @@ func bqSchemaToQRecordSchema(schema bigquery.Schema) (*model.QRecordSchema, erro func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecordBatch, error) { it, err := b.client.Query(query).Read(context.Background()) if err != nil { - fmt.Printf("failed to run command: %v\n", err) return nil, fmt.Errorf("failed to run command: %w", err) } @@ -317,7 +314,6 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor break } if err != nil { - fmt.Printf("failed to iterate over query results: %v\n", err) return nil, fmt.Errorf("failed to iterate over query results: %w", err) } @@ -446,7 +442,6 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord } command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.datasetName, tableName, strings.Join(fields, ", ")) - fmt.Printf("creating table %s with command %s\n", tableName, command) err := b.RunCommand(command) if err != nil { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index c76ea2c3dc..cc0352fae3 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -306,7 +306,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { `, srcTableName), testKey, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -380,7 +380,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { END; `, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -442,7 +442,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { END; `, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") done <- struct{}{} }() @@ -518,7 +518,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -585,7 +585,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -652,7 +652,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { END; `, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -739,7 +739,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", }) if err != nil { - fmt.Println("error %w", err) + s.t.Log(err) } // Make sure that there are no nulls s.True(noNulls) @@ -788,7 +788,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) require.NoError(s.t, err) - fmt.Println("Executed an insert on two tables") + s.t.Log("Executed an insert on two tables") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -848,7 +848,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) require.NoError(s.t, err) - fmt.Println("Inserted initial row in the source table") + s.t.Log("Inserted initial row in the source table") // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -858,11 +858,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) require.NoError(s.t, err) - fmt.Println("Altered source table, added column c2") + s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) require.NoError(s.t, err) - fmt.Println("Inserted row with added c2 in the source table") + s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) @@ -872,11 +872,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) require.NoError(s.t, err) - fmt.Println("Altered source table, dropped column c2 and added column c3") + s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) require.NoError(s.t, err) - fmt.Println("Inserted row with added c3 in the source table") + s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) @@ -886,11 +886,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) require.NoError(s.t, err) - fmt.Println("Altered source table, dropped column c3") + s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) require.NoError(s.t, err) - fmt.Println("Inserted row after dropping all columns in the source table") + s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) @@ -955,7 +955,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { `, srcTableName), i, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -1032,7 +1032,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { `, srcTableName), i, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) @@ -1107,7 +1107,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { `, srcTableName), i, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), @@ -1240,7 +1240,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) require.NoError(s.t, err) - fmt.Println("Executed an insert on two tables") + s.t.Log("Executed an insert on two tables") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index e6d3ac9f5b..def151071c 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -23,7 +23,7 @@ func (s PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { // fail if table creation fails require.NoError(s.t, err) - fmt.Printf("created table on bigquery: %s.%s. %v\n", s.bqHelper.Config.DatasetId, dstTable, err) + s.t.Logf("created table on bigquery: %s.%s. %v", s.bqHelper.Config.DatasetId, dstTable, err) } func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { @@ -39,11 +39,11 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr // read rows from destination table qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) - fmt.Printf("running query on bigquery: %s\n", bqSelQuery) + s.t.Logf("running query on bigquery: %s", bqSelQuery) bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) require.NoError(s.t, err) - s.True(pgRows.Equals(bqRows)) + e2e.RequireEqualRecordBatchs(s.t, pgRows, bqRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 779beeea2a..53658bf6ff 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -84,7 +84,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { `, srcTableName), testKey, testValue) s.NoError(err) } - fmt.Println("Inserted 10 rows into the source table") + s.T().Log("Inserted 10 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -141,7 +141,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) s.NoError(err) - fmt.Println("Inserted initial row in the source table") + s.T().Log("Inserted initial row in the source table") // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -165,11 +165,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) s.NoError(err) - fmt.Println("Altered source table, added column c2") + s.T().Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) s.NoError(err) - fmt.Println("Inserted row with added c2 in the source table") + s.T().Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) @@ -194,11 +194,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) s.NoError(err) - fmt.Println("Altered source table, dropped column c2 and added column c3") + s.T().Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) s.NoError(err) - fmt.Println("Inserted row with added c3 in the source table") + s.T().Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) @@ -224,11 +224,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) s.NoError(err) - fmt.Println("Altered source table, dropped column c3") + s.T().Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) s.NoError(err) - fmt.Println("Inserted row after dropping all columns in the source table") + s.T().Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) @@ -309,7 +309,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { `, srcTableName), i, testValue) s.NoError(err) } - fmt.Println("Inserted 10 rows into the source table") + s.T().Log("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -391,7 +391,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { `, srcTableName), i, testValue) s.NoError(err) } - fmt.Println("Inserted 10 rows into the source table") + s.T().Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) @@ -470,7 +470,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { `, srcTableName), i, testValue) s.NoError(err) } - fmt.Println("Inserted 10 rows into the source table") + s.T().Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), @@ -544,7 +544,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { DELETE FROM %s WHERE id=1 `, srcTableName)) s.NoError(err) - fmt.Println("Inserted and deleted a row for peerdb column check") + s.T().Log("Inserted and deleted a row for peerdb column check") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 880e2d19e4..d408451939 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -107,12 +107,14 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) - rows, _ := s.pool.Query(context.Background(), query) - rowsPresent := false - + rows, err := s.pool.Query(context.Background(), query) + if err != nil { + return err + } defer rows.Close() + + errors := make([]string, 0) for rows.Next() { - rowsPresent = true values, err := rows.Values() if err != nil { return err @@ -120,17 +122,18 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali columns := rows.FieldDescriptions() + errmsg := make([]string, 0, len(values)) for i, value := range values { - fmt.Printf("%s: %v\n", columns[i].Name, value) + errmsg = append(errmsg, fmt.Sprintf("%s: %v", columns[i].Name, value)) } - fmt.Println("---") + errors = append(errors, strings.Join(errmsg, "\n")) } if rows.Err() != nil { return rows.Err() } - if rowsPresent { - return fmt.Errorf("comparison failed: rows are not equal") + if len(errors) > 0 { + return fmt.Errorf("comparison failed: rows are not equal\n%s", strings.Join(errors, "\n---\n")) } return nil } diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 2f4bfd4404..f28161b97b 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -81,9 +81,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - fmt.Println("JobName: ", flowJobName) + s.T().Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) + s.T().Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) require.NoError(s.T(), err) require.Equal(s.T(), 4, len(files)) @@ -138,7 +138,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { `, srcTableName), testKey, testValue) s.NoError(err) } - fmt.Println("Inserted 20 rows into the source table") + s.T().Log("Inserted 20 rows into the source table") s.NoError(err) }() @@ -154,9 +154,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - fmt.Println("JobName: ", flowJobName) + s.T().Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files)) + s.T().Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) require.NoError(s.T(), err) require.Equal(s.T(), 4, len(files)) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index fc7e134784..314c92d2b5 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -153,7 +153,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, srcTableName), testKey, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 20 rows into the source table") + s.t.Log("Inserted 20 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -231,7 +231,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { `, srcTableName), i, testKey, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 20 rows into the source table") + s.t.Log("Inserted 20 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -297,7 +297,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { ) require.NoError(s.t, err) } - fmt.Println("Inserted 4 invalid geography rows into the source table") + s.t.Log("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) @@ -307,7 +307,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "00f03f000000000000000000000000000000000000000000000000") require.NoError(s.t, err) } - fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") + s.t.Log("Inserted 6 valid geography rows and 10 total rows into source") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -386,7 +386,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { END; `, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -528,7 +528,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -594,7 +594,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -660,7 +660,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) - fmt.Println("Executed a transaction touching toast columns") + s.t.Log("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -747,7 +747,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", }) if err != nil { - fmt.Println("error %w", err) + s.t.Log(err) } // Make sure that there are no nulls s.Equal(noNulls, true) @@ -852,7 +852,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) require.NoError(s.t, err) - fmt.Println("Inserted initial row in the source table") + s.t.Log("Inserted initial row in the source table") // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -876,11 +876,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) require.NoError(s.t, err) - fmt.Println("Altered source table, added column c2") + s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) require.NoError(s.t, err) - fmt.Println("Inserted row with added c2 in the source table") + s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) @@ -905,11 +905,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) require.NoError(s.t, err) - fmt.Println("Altered source table, dropped column c2 and added column c3") + s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) require.NoError(s.t, err) - fmt.Println("Inserted row with added c3 in the source table") + s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) @@ -935,11 +935,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) require.NoError(s.t, err) - fmt.Println("Altered source table, dropped column c3") + s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) require.NoError(s.t, err) - fmt.Println("Inserted row after dropping all columns in the source table") + s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) @@ -1019,7 +1019,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { `, srcTableName), i, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -1096,7 +1096,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { `, srcTableName), i, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) @@ -1170,7 +1170,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { `, srcTableName), i, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), @@ -1251,7 +1251,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { `, srcTableName), i, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index a1fe767874..f45a78d7c1 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -43,13 +43,12 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableNam qualifiedTableName = fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) } - sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) - fmt.Printf("running query on snowflake: %s\n", sfSelQuery) - + sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) + s.t.Logf("running query on snowflake: %s\n", sfSelQuery) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) require.NoError(s.t, err) - require.True(s.t, pgRows.Equals(sfRows), "rows from source and destination tables are not equal") + e2e.RequireEqualRecordBatchs(s.t, pgRows, sfRows) } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 11be8c6ee7..4674667b97 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -13,6 +13,7 @@ import ( "github.com/PeerDB-io/peer-flow/activities" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" @@ -105,7 +106,6 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, } if len(state.NormalizeFlowStatuses) >= minCount { - fmt.Println("query indicates setup is complete") break } } else { @@ -173,7 +173,6 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err return err } - fmt.Printf("created table on postgres: e2e_test_%s.%s\n", suffix, tableName) return nil } @@ -418,3 +417,41 @@ func (l *TStructuredLogger) Warn(msg string, keyvals ...interface{}) { func (l *TStructuredLogger) Error(msg string, keyvals ...interface{}) { l.logger.With(l.keyvalsToFields(keyvals)).Error(msg) } + +// Equals checks if two QRecordBatches are identical. +func RequireEqualRecordBatchs(t *testing.T, q *model.QRecordBatch, other *model.QRecordBatch) bool { + t.Helper() + + if other == nil { + t.Log("other is nil") + return q == nil + } + + // First check simple attributes + if q.NumRecords != other.NumRecords { + // print num records + t.Logf("q.NumRecords: %d", q.NumRecords) + t.Logf("other.NumRecords: %d", other.NumRecords) + return false + } + + // Compare column names + if !q.Schema.EqualNames(other.Schema) { + t.Log("Column names are not equal") + t.Logf("Schema 1: %v", q.Schema.GetColumnNames()) + t.Logf("Schema 2: %v", other.Schema.GetColumnNames()) + return false + } + + // Compare records + for i, record := range q.Records { + if !e2eshared.CheckQRecordEquality(t, record, other.Records[i]) { + t.Logf("Record %d is not equal", i) + t.Logf("Record 1: %v", record) + t.Logf("Record 2: %v", other.Records[i]) + return false + } + } + + return true +} diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 56b9ffc985..e235536ade 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -6,6 +6,8 @@ import ( "os" "testing" + "github.com/PeerDB-io/peer-flow/model" + "github.com/ysmood/got" ) @@ -42,3 +44,23 @@ func ReadFileToBytes(path string) ([]byte, error) { return ret, nil } + +// checks if two QRecords are identical +func CheckQRecordEquality(t *testing.T, q model.QRecord, other model.QRecord) bool { + t.Helper() + + if q.NumEntries != other.NumEntries { + t.Logf("unequal entry count: %d != %d\n", q.NumEntries, other.NumEntries) + return false + } + + for i, entry := range q.Entries { + otherEntry := other.Entries[i] + if !entry.Equals(otherEntry) { + t.Logf("entry %d: %v != %v\n", i, entry, otherEntry) + return false + } + } + + return true +} diff --git a/flow/model/qrecord.go b/flow/model/qrecord.go index bc2f7f84ff..ab7fbca24e 100644 --- a/flow/model/qrecord.go +++ b/flow/model/qrecord.go @@ -1,8 +1,6 @@ package model import ( - "fmt" - "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -20,24 +18,6 @@ func NewQRecord(n int) QRecord { } // Sets the value at the given index -func (q *QRecord) Set(idx int, value qvalue.QValue) { +func (q QRecord) Set(idx int, value qvalue.QValue) { q.Entries[idx] = value } - -// equals checks if two QRecords are identical. -func (q QRecord) equals(other QRecord) bool { - // First check simple attributes - if q.NumEntries != other.NumEntries { - return false - } - - for i, entry := range q.Entries { - otherEntry := other.Entries[i] - if !entry.Equals(otherEntry) { - fmt.Printf("entry %d: %v != %v\n", i, entry, otherEntry) - return false - } - } - - return true -} diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index c0fd76b643..e6cf919f66 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -18,42 +18,6 @@ type QRecordBatch struct { Schema *QRecordSchema } -// Equals checks if two QRecordBatches are identical. -func (q *QRecordBatch) Equals(other *QRecordBatch) bool { - if other == nil { - fmt.Printf("other is nil") - return q == nil - } - - // First check simple attributes - if q.NumRecords != other.NumRecords { - // print num records - fmt.Printf("q.NumRecords: %d\n", q.NumRecords) - fmt.Printf("other.NumRecords: %d\n", other.NumRecords) - return false - } - - // Compare column names - if !q.Schema.EqualNames(other.Schema) { - fmt.Printf("Column names are not equal\n") - fmt.Printf("Schema 1: %v\n", q.Schema.GetColumnNames()) - fmt.Printf("Schema 2: %v\n", other.Schema.GetColumnNames()) - return false - } - - // Compare records - for i, record := range q.Records { - if !record.equals(other.Records[i]) { - fmt.Printf("Record %d is not equal\n", i) - fmt.Printf("Record 1: %v\n", record) - fmt.Printf("Record 2: %v\n", other.Records[i]) - return false - } - } - - return true -} - func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) { stream := NewQRecordStream(buffer) diff --git a/flow/model/qrecord_test.go b/flow/model/qrecord_test.go index 5d46d61541..0cd6022e52 100644 --- a/flow/model/qrecord_test.go +++ b/flow/model/qrecord_test.go @@ -1,9 +1,11 @@ -package model +package model_test import ( "math/big" "testing" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -15,17 +17,17 @@ func TestEquals(t *testing.T) { tests := []struct { name string - q1 QRecord - q2 QRecord + q1 model.QRecord + q2 model.QRecord want bool }{ { name: "Equal - Same UUID", - q1: QRecord{ + q1: model.QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, }, - q2: QRecord{ + q2: model.QRecord{ NumEntries: 1, Entries: []qvalue.QValue{ {Kind: qvalue.QValueKindString, Value: uuidVal1.String()}, @@ -35,11 +37,11 @@ func TestEquals(t *testing.T) { }, { name: "Not Equal - Different UUID", - q1: QRecord{ + q1: model.QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, }, - q2: QRecord{ + q2: model.QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal2}}, }, @@ -47,13 +49,13 @@ func TestEquals(t *testing.T) { }, { name: "Equal - Same numeric", - q1: QRecord{ + q1: model.QRecord{ NumEntries: 1, Entries: []qvalue.QValue{ {Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}, }, }, - q2: QRecord{ + q2: model.QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindString, Value: "5"}}, }, @@ -61,13 +63,13 @@ func TestEquals(t *testing.T) { }, { name: "Not Equal - Different numeric", - q1: QRecord{ + q1: model.QRecord{ NumEntries: 1, Entries: []qvalue.QValue{ {Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}, }, }, - q2: QRecord{ + q2: model.QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: "4.99"}}, }, @@ -77,7 +79,7 @@ func TestEquals(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.q1.equals(tt.q2), tt.want) + assert.Equal(t, e2eshared.CheckQRecordEquality(t, tt.q1, tt.q2), tt.want) }) } }