Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go lints: enable forbidigo #923

Merged
merged 2 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ linters:
- dogsled
- durationcheck
- errcheck
- forbidigo
- gofumpt
- gosec
- gosimple
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,6 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
break
}
if err != nil {
fmt.Printf("Error while iterating through results: %v\n", err)
return nil, err
}
resultMap[row.Tablename] = row.UnchangedToastColumns
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ func TestGetQRepPartitions(t *testing.T) {
// from 2010 Jan 1 10:00 AM UTC to 2010 Jan 30 10:00 AM UTC
numRows := prepareTestData(t, pool.Pool, schemaName)

secondsInADay := uint32(24 * time.Hour / time.Second)
fmt.Printf("secondsInADay: %d\n", secondsInADay)

// Define the test cases
testCases := []*testCase{
newTestCaseForNumRows(
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestWriteRecordsToAvroFileHappyPath(t *testing.T) {
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
require.NoError(t, err)

fmt.Printf("[test] avroSchema: %v\n", avroSchema)
t.Logf("[test] avroSchema: %v", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(context.Background(),
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestWriteRecordsToZstdAvroFileHappyPath(t *testing.T) {
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
require.NoError(t, err)

fmt.Printf("[test] avroSchema: %v\n", avroSchema)
t.Logf("[test] avroSchema: %v", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(context.Background(),
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestWriteRecordsToDeflateAvroFileHappyPath(t *testing.T) {
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
require.NoError(t, err)

fmt.Printf("[test] avroSchema: %v\n", avroSchema)
t.Logf("[test] avroSchema: %v", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(context.Background(),
Expand All @@ -228,7 +228,7 @@ func TestWriteRecordsToAvroFileNonNull(t *testing.T) {
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
require.NoError(t, err)

fmt.Printf("[test] avroSchema: %v\n", avroSchema)
t.Logf("[test] avroSchema: %v", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(context.Background(),
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestWriteRecordsToAvroFileAllNulls(t *testing.T) {
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
require.NoError(t, err)

fmt.Printf("[test] avroSchema: %v\n", avroSchema)
t.Logf("[test] avroSchema: %v", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(context.Background(),
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem
}

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", schemaName, tableName, strings.Join(fields, ", "))
fmt.Printf("creating table %s.%s with command %s\n", schemaName, tableName, command)

_, err := g.db.ExecContext(g.ctx, command)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (b *BigQueryTestHelper) datasetExists(datasetName string) (bool, error) {
if err != nil {
// if err message contains `notFound` then dataset does not exist.
if strings.Contains(err.Error(), "notFound") {
fmt.Printf("dataset %s does not exist\n", b.Config.DatasetId)
return false, nil
}

Expand Down Expand Up @@ -134,7 +133,6 @@ func (b *BigQueryTestHelper) RecreateDataset() error {
return fmt.Errorf("failed to create dataset: %w", err)
}

fmt.Printf("created dataset %s successfully\n", b.datasetName)
return nil
}

Expand Down Expand Up @@ -305,7 +303,6 @@ func bqSchemaToQRecordSchema(schema bigquery.Schema) (*model.QRecordSchema, erro
func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecordBatch, error) {
it, err := b.client.Query(query).Read(context.Background())
if err != nil {
fmt.Printf("failed to run command: %v\n", err)
return nil, fmt.Errorf("failed to run command: %w", err)
}

Expand All @@ -317,7 +314,6 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
break
}
if err != nil {
fmt.Printf("failed to iterate over query results: %v\n", err)
return nil, fmt.Errorf("failed to iterate over query results: %w", err)
}

Expand Down Expand Up @@ -446,7 +442,6 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord
}

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.datasetName, tableName, strings.Join(fields, ", "))
fmt.Printf("creating table %s with command %s\n", tableName, command)

err := b.RunCommand(command)
if err != nil {
Expand Down
38 changes: 19 additions & 19 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
`, srcTableName), testKey, testValue)
require.NoError(s.t, err)
}
fmt.Println("Inserted 10 rows into the source table")
s.t.Log("Inserted 10 rows into the source table")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -380,7 +380,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
END;
`, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
fmt.Println("Executed a transaction touching toast columns")
s.t.Log("Executed a transaction touching toast columns")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -442,7 +442,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
END;
`, srcTableName, srcTableName))
require.NoError(s.t, err)
fmt.Println("Executed a transaction touching toast columns")
s.t.Log("Executed a transaction touching toast columns")
done <- struct{}{}
}()

Expand Down Expand Up @@ -518,7 +518,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
`, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName,
srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
fmt.Println("Executed a transaction touching toast columns")
s.t.Log("Executed a transaction touching toast columns")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -585,7 +585,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
END;
`, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
fmt.Println("Executed a transaction touching toast columns")
s.t.Log("Executed a transaction touching toast columns")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -652,7 +652,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
END;
`, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
fmt.Println("Executed a transaction touching toast columns")
s.t.Log("Executed a transaction touching toast columns")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -739,7 +739,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44",
})
if err != nil {
fmt.Println("error %w", err)
s.t.Log(err)
}
// Make sure that there are no nulls
s.True(noNulls)
Expand Down Expand Up @@ -788,7 +788,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1');
`, srcTable1Name, srcTable2Name))
require.NoError(s.t, err)
fmt.Println("Executed an insert on two tables")
s.t.Log("Executed an insert on two tables")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -848,7 +848,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
require.NoError(s.t, err)
fmt.Println("Inserted initial row in the source table")
s.t.Log("Inserted initial row in the source table")

// verify we got our first row.
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
Expand All @@ -858,11 +858,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName))
require.NoError(s.t, err)
fmt.Println("Altered source table, added column c2")
s.t.Log("Altered source table, added column c2")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2)
require.NoError(s.t, err)
fmt.Println("Inserted row with added c2 in the source table")
s.t.Log("Inserted row with added c2 in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
Expand All @@ -872,11 +872,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
require.NoError(s.t, err)
fmt.Println("Altered source table, dropped column c2 and added column c3")
s.t.Log("Altered source table, dropped column c2 and added column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3)
require.NoError(s.t, err)
fmt.Println("Inserted row with added c3 in the source table")
s.t.Log("Inserted row with added c3 in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 6)
Expand All @@ -886,11 +886,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c3`, srcTableName))
require.NoError(s.t, err)
fmt.Println("Altered source table, dropped column c3")
s.t.Log("Altered source table, dropped column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4)
require.NoError(s.t, err)
fmt.Println("Inserted row after dropping all columns in the source table")
s.t.Log("Inserted row after dropping all columns in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
Expand Down Expand Up @@ -955,7 +955,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
`, srcTableName), i, testValue)
require.NoError(s.t, err)
}
fmt.Println("Inserted 10 rows into the source table")
s.t.Log("Inserted 10 rows into the source table")

// verify we got our 10 rows
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
`, srcTableName), i, testValue)
require.NoError(s.t, err)
}
fmt.Println("Inserted 10 rows into the source table")
s.t.Log("Inserted 10 rows into the source table")

_, err = rowsTx.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
`, srcTableName), i, testValue)
require.NoError(s.t, err)
}
fmt.Println("Inserted 10 rows into the source table")
s.t.Log("Inserted 10 rows into the source table")

e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(),
Expand Down Expand Up @@ -1240,7 +1240,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1');
`, srcTable1Name, srcTable2Name))
require.NoError(s.t, err)
fmt.Println("Executed an insert on two tables")
s.t.Log("Executed an insert on two tables")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (s PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) {
// fail if table creation fails
require.NoError(s.t, err)

fmt.Printf("created table on bigquery: %s.%s. %v\n", s.bqHelper.Config.DatasetId, dstTable, err)
s.t.Logf("created table on bigquery: %s.%s. %v", s.bqHelper.Config.DatasetId, dstTable, err)
}

func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) {
Expand All @@ -39,11 +39,11 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr
// read rows from destination table
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
fmt.Printf("running query on bigquery: %s\n", bqSelQuery)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
require.NoError(s.t, err)

s.True(pgRows.Equals(bqRows))
e2e.RequireEqualRecordBatchs(s.t, pgRows, bqRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
Expand Down
24 changes: 12 additions & 12 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
`, srcTableName), testKey, testValue)
s.NoError(err)
}
fmt.Println("Inserted 10 rows into the source table")
s.T().Log("Inserted 10 rows into the source table")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -141,7 +141,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
s.NoError(err)
fmt.Println("Inserted initial row in the source table")
s.T().Log("Inserted initial row in the source table")

// verify we got our first row.
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
Expand All @@ -165,11 +165,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName))
s.NoError(err)
fmt.Println("Altered source table, added column c2")
s.T().Log("Altered source table, added column c2")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2)
s.NoError(err)
fmt.Println("Inserted row with added c2 in the source table")
s.T().Log("Inserted row with added c2 in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
Expand All @@ -194,11 +194,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
s.NoError(err)
fmt.Println("Altered source table, dropped column c2 and added column c3")
s.T().Log("Altered source table, dropped column c2 and added column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3)
s.NoError(err)
fmt.Println("Inserted row with added c3 in the source table")
s.T().Log("Inserted row with added c3 in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 6)
Expand All @@ -224,11 +224,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c3`, srcTableName))
s.NoError(err)
fmt.Println("Altered source table, dropped column c3")
s.T().Log("Altered source table, dropped column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4)
s.NoError(err)
fmt.Println("Inserted row after dropping all columns in the source table")
s.T().Log("Inserted row after dropping all columns in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
Expand Down Expand Up @@ -309,7 +309,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
`, srcTableName), i, testValue)
s.NoError(err)
}
fmt.Println("Inserted 10 rows into the source table")
s.T().Log("Inserted 10 rows into the source table")

// verify we got our 10 rows
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
Expand Down Expand Up @@ -391,7 +391,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
`, srcTableName), i, testValue)
s.NoError(err)
}
fmt.Println("Inserted 10 rows into the source table")
s.T().Log("Inserted 10 rows into the source table")

_, err = rowsTx.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
Expand Down Expand Up @@ -470,7 +470,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
`, srcTableName), i, testValue)
s.NoError(err)
}
fmt.Println("Inserted 10 rows into the source table")
s.T().Log("Inserted 10 rows into the source table")

e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(),
Expand Down Expand Up @@ -544,7 +544,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
DELETE FROM %s WHERE id=1
`, srcTableName))
s.NoError(err)
fmt.Println("Inserted and deleted a row for peerdb column check")
s.T().Log("Inserted and deleted a row for peerdb column check")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down
Loading
Loading