From 9096d3cd9dd56e81fe56c5828baeabeac90bdb7a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 7 Jun 2023 16:23:28 -0400 Subject: [PATCH] Add lints for golang (#76) --- .github/workflows/golang-lint.yml | 28 ++++++++++++++++++++++++++ flow/activities/fetch_config.go | 3 +-- flow/activities/flowable.go | 9 +++++---- flow/connectors/bigquery/bigquery.go | 16 ++++++++------- flow/connectors/bigquery/qrep.go | 12 +++++++---- flow/connectors/postgres/cdc.go | 1 - flow/connectors/postgres/postgres.go | 4 +++- flow/connectors/postgres/qrep.go | 9 ++++++--- flow/connectors/snowflake/qrep.go | 9 ++++++--- flow/connectors/snowflake/snowflake.go | 9 ++++++--- flow/e2e/congen.go | 3 ++- flow/e2e/peer_flow_test.go | 1 + flow/workflows/activities.go | 6 ++++-- flow/workflows/peer_flow.go | 6 ++++-- flow/workflows/qrep_flow.go | 3 ++- flow/workflows/setup_flow.go | 3 ++- flow/workflows/sync_flow.go | 9 ++++++--- 17 files changed, 93 insertions(+), 38 deletions(-) create mode 100644 .github/workflows/golang-lint.yml diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml new file mode 100644 index 0000000000..22a329c7c4 --- /dev/null +++ b/.github/workflows/golang-lint.yml @@ -0,0 +1,28 @@ +name: GolangCI-Lint + +on: [pull_request] + +jobs: + golangci-lint: + permissions: + checks: write + contents: read + pull-requests: write + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v3 + with: + submodules: recursive + token: ${{ secrets.SUBMODULE_CHECKOUT }} + + - name: golangci-lint + uses: reviewdog/action-golangci-lint@v2 + with: + workdir: ./flow + reporter: github-pr-review + github_token: ${{ secrets.GITHUB_TOKEN }} + golangci_lint_flags: "--timeout 10m" + fail_on_error: true + env: + REVIEWDOG_TOKEN: ${{ secrets.REVIEWDOG_TOKEN }} diff --git a/flow/activities/fetch_config.go b/flow/activities/fetch_config.go index 926d85c446..c9a0bb2460 100644 --- a/flow/activities/fetch_config.go +++ b/flow/activities/fetch_config.go @@ -20,8 +20,7 @@ type FetchConfigActivityInput struct { // FetchConfigActivity is an activity that fetches the config for the specified peer flow. // This activity is invoked by the PeerFlowWorkflow. -type FetchConfigActivity struct { -} +type FetchConfigActivity struct{} // FetchConfig retrieves the source and destination config. func (a *FetchConfigActivity) FetchConfig( diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index a61a928c6c..38e066adc0 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -57,8 +57,7 @@ type IFlowable interface { } // FlowableActivity is the activity implementation for IFlowable. -type FlowableActivity struct { -} +type FlowableActivity struct{} // CheckConnection implements IFlowable.CheckConnection. func (a *FlowableActivity) CheckConnection( @@ -274,7 +273,8 @@ func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config * // GetQRepPartitions returns the partitions for a given QRepConfig. func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, - last *protos.QRepPartition) (*protos.QRepParitionResult, error) { + last *protos.QRepPartition, +) (*protos.QRepParitionResult, error) { conn, err := connectors.GetConnector(ctx, config.SourcePeer) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) @@ -294,7 +294,8 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, // ReplicateQRepPartition replicates a QRepPartition from the source to the destination. func (a *FlowableActivity) ReplicateQRepPartition(ctx context.Context, config *protos.QRepConfig, - partition *protos.QRepPartition) error { + partition *protos.QRepPartition, +) error { srcConn, err := connectors.GetConnector(ctx, config.SourcePeer) if err != nil { return fmt.Errorf("failed to get source connector: %w", err) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index a679b88e44..050b1084ee 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -324,7 +324,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S records := make([]StagingBQRecord, 0) - var first = true + first := true var firstCP int64 = 0 var lastCP int64 = 0 @@ -520,7 +520,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) // normalize anything between last normalized batch id to last sync batchid mergeStmts := mergeGen.GenerateMergeStmts() - //update metadata to make the last normalized batch id to the recent last sync batch id. + // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) @@ -532,9 +532,9 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) stmts = append(stmts, updateMetadataStmt) stmts = append(stmts, "COMMIT TRANSACTION;") - //put this within a transaction - //TODO - not truncating rows in staging table as of now. - //err = c.truncateTable(staging...) + // put this within a transaction + // TODO - not truncating rows in staging table as of now. + // err = c.truncateTable(staging...) _, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx) if err != nil { @@ -642,7 +642,8 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec // getAppendStagingToRawStmt returns the statement to append the staging table to the raw table. func (c *BigQueryConnector) getAppendStagingToRawStmt( - rawTableName string, stagingTableName string, stagingBatchID int64) string { + rawTableName string, stagingTableName string, stagingBatchID int64, +) string { return fmt.Sprintf( "INSERT INTO %s.%s SELECT _peerdb_uid,_peerdb_timestamp,_peerdb_timestamp_nanos,_peerdb_data,_peerdb_record_type,_peerdb_match_data,_peerdb_batch_id FROM %s.%s WHERE _peerdb_staging_batch_id = %d;", c.datasetID, rawTableName, c.datasetID, stagingTableName, stagingBatchID) @@ -682,7 +683,8 @@ func (c *BigQueryConnector) GetTableSchema(req *protos.GetTableSchemaInput) (*pr // SetupNormalizedTable sets up a normalized table, implementing the Connector interface. // This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided. func (c *BigQueryConnector) SetupNormalizedTable( - req *protos.SetupNormalizedTableInput) (*protos.SetupNormalizedTableOutput, error) { + req *protos.SetupNormalizedTableInput, +) (*protos.SetupNormalizedTableOutput, error) { // convert the column names and types to bigquery types sourceSchema := req.SourceTableSchema diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 0844ca2592..a6dddc1ca2 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -79,19 +79,22 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) { } func (c *BigQueryConnector) GetQRepPartitions(config *protos.QRepConfig, - last *protos.QRepPartition) ([]*protos.QRepPartition, error) { + last *protos.QRepPartition, +) ([]*protos.QRepPartition, error) { panic("not implemented") } func (c *BigQueryConnector) PullQRepRecords(config *protos.QRepConfig, - partition *protos.QRepPartition) (*model.QRecordBatch, error) { + partition *protos.QRepPartition, +) (*model.QRecordBatch, error) { panic("not implemented") } func (c *BigQueryConnector) SyncQRepRecords( config *protos.QRepConfig, partition *protos.QRepPartition, - records *model.QRecordBatch) (int, error) { + records *model.QRecordBatch, +) (int, error) { // Ensure the destination table is available. destTable := config.DestinationTableIdentifier bqTable := c.client.Dataset(c.datasetID).Table(destTable) @@ -201,7 +204,8 @@ func (c *BigQueryConnector) SyncQRepRecords( func (c *BigQueryConnector) createMetadataInsertStatement( partition *protos.QRepPartition, jobName string, - startTime time.Time) (string, error) { + startTime time.Time, +) (string, error) { // marshal the partition to json using protojson pbytes, err := protojson.Marshal(partition) if err != nil { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index c984baa7fb..32dd5c2730 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -168,7 +168,6 @@ func (p *PostgresCDCSource) consumeStream( log.Debugf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime) rec, err := p.processMessage(xld) - if err != nil { return nil, fmt.Errorf("error processing message: %w", err) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 4e5a1c9dfb..2ae5002925 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -81,6 +81,7 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState func (c *PostgresConnector) GetLastSyncBatchId(jobName string) (int64, error) { panic("not implemented") } + func (c *PostgresConnector) GetLastNormalizeBatchId(jobName string) (int64, error) { panic("not implemented") } @@ -292,7 +293,8 @@ func (c *PostgresConnector) GetTableSchema(req *protos.GetTableSchemaInput) (*pr // SetupNormalizedTable sets up a normalized table, implementing the Connector interface. func (c *PostgresConnector) SetupNormalizedTable( - req *protos.SetupNormalizedTableInput) (*protos.SetupNormalizedTableOutput, error) { + req *protos.SetupNormalizedTableInput, +) (*protos.SetupNormalizedTableOutput, error) { panic("not implemented") } diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index d5c40c2074..748cd77035 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -16,7 +16,8 @@ import ( ) func (c *PostgresConnector) GetQRepPartitions(config *protos.QRepConfig, - last *protos.QRepPartition) ([]*protos.QRepPartition, error) { + last *protos.QRepPartition, +) ([]*protos.QRepPartition, error) { // For the table `config.SourceTableIdentifier` // Get the min, max value (inclusive) of `config.WatermarkColumn` extremaQuery := fmt.Sprintf("SELECT MIN(%s), MAX(%s) FROM %s", @@ -138,7 +139,8 @@ func mapRowToQRecord(row pgx.Row, fds []pgconn.FieldDescription) (*model.QRecord } func (c *PostgresConnector) PullQRepRecords(config *protos.QRepConfig, - partition *protos.QRepPartition) (*model.QRecordBatch, error) { + partition *protos.QRepPartition, +) (*model.QRecordBatch, error) { var rangeStart interface{} var rangeEnd interface{} @@ -196,7 +198,8 @@ func (c *PostgresConnector) PullQRepRecords(config *protos.QRepConfig, } func (c *PostgresConnector) SyncQRepRecords(config *protos.QRepConfig, - partition *protos.QRepPartition, records *model.QRecordBatch) (int, error) { + partition *protos.QRepPartition, records *model.QRecordBatch, +) (int, error) { return 0, fmt.Errorf("SyncQRepRecords not implemented for postgres connector") } diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index d10498b65e..367c301025 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -6,19 +6,22 @@ import ( ) func (c *SnowflakeConnector) GetQRepPartitions(config *protos.QRepConfig, - last *protos.QRepPartition) ([]*protos.QRepPartition, error) { + last *protos.QRepPartition, +) ([]*protos.QRepPartition, error) { panic("not implemented") } func (c *SnowflakeConnector) PullQRepRecords(config *protos.QRepConfig, - partition *protos.QRepPartition) (*model.QRecordBatch, error) { + partition *protos.QRepPartition, +) (*model.QRecordBatch, error) { panic("not implemented") } func (c *SnowflakeConnector) SyncQRepRecords( config *protos.QRepConfig, partition *protos.QRepPartition, - records *model.QRecordBatch) (int, error) { + records *model.QRecordBatch, +) (int, error) { panic("not implemented") } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 18ad7050f9..b800c0ad59 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -225,7 +225,8 @@ func (c *SnowflakeConnector) GetTableSchema(req *protos.GetTableSchemaInput) (*p } func (c *SnowflakeConnector) SetupNormalizedTable( - req *protos.SetupNormalizedTableInput) (*protos.SetupNormalizedTableOutput, error) { + req *protos.SetupNormalizedTableInput, +) (*protos.SetupNormalizedTableOutput, error) { normalizedTableInfo, err := parseTableInfo(req.TableIdentifier, req.SourceTableSchema) if err != nil { return nil, fmt.Errorf("error while parsing table schema and name: %w", err) @@ -545,7 +546,8 @@ func getRawTableIdentifier(jobName string, tableIdentifier string) string { } func (c *SnowflakeConnector) insertRecordsInRawTable(schemaIdentifier string, rawTableIdentifier string, - snowflakeRawRecords []snowflakeRawRecord, syncRecordsTx *sql.Tx) error { + snowflakeRawRecords []snowflakeRawRecord, syncRecordsTx *sql.Tx, +) error { rawRecordsData := make([]any, 0) for _, record := range snowflakeRawRecords { @@ -560,7 +562,8 @@ func (c *SnowflakeConnector) insertRecordsInRawTable(schemaIdentifier string, ra } func (c *SnowflakeConnector) generateAndExecuteMergeStatement(sourceTableInfo *tableInfo, destinationTableIdentifier string, - rawTableIdentifier string, syncBatchID int64, normalizeBatchID int64, normalizeRecordsTx *sql.Tx) error { + rawTableIdentifier string, syncBatchID int64, normalizeBatchID int64, normalizeRecordsTx *sql.Tx, +) error { normalizedTableSchema := sourceTableInfo.tableSchema // TODO: switch this to function maps.Keys when it is moved into Go's stdlib columnNames := make([]string, 0, len(normalizedTableSchema.Columns)) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index a39e40d29f..049f0e0beb 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -89,7 +89,8 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto // GenerateQRepConfig generates a qrep config for testing. func (c *FlowConnectionGenerationConfig) GenerateQRepConfig( - query string, watermark string) (*protos.QRepConfig, error) { + query string, watermark string, +) (*protos.QRepConfig, error) { ret := &protos.QRepConfig{} ret.FlowJobName = c.FlowJobName diff --git a/flow/e2e/peer_flow_test.go b/flow/e2e/peer_flow_test.go index 2ee88069fc..7734add767 100644 --- a/flow/e2e/peer_flow_test.go +++ b/flow/e2e/peer_flow_test.go @@ -96,6 +96,7 @@ func (s *E2EPeerFlowTestSuite) setupBigQuery() error { // Implement SetupAllSuite interface to setup the test suite func (s *E2EPeerFlowTestSuite) SetupSuite() { // seed the random number generator with current time + rand.Seed(time.Now().UnixNano()) err := s.setupPostgres() diff --git a/flow/workflows/activities.go b/flow/workflows/activities.go index c8e8ce4984..b82655f366 100644 --- a/flow/workflows/activities.go +++ b/flow/workflows/activities.go @@ -2,5 +2,7 @@ package peerflow import "github.com/PeerDB-io/peer-flow/activities" -var fetchConfig *activities.FetchConfigActivity -var flowable *activities.FlowableActivity +var ( + fetchConfig *activities.FetchConfigActivity + flowable *activities.FlowableActivity +) diff --git a/flow/workflows/peer_flow.go b/flow/workflows/peer_flow.go index 879a3af330..28a35337ec 100644 --- a/flow/workflows/peer_flow.go +++ b/flow/workflows/peer_flow.go @@ -75,7 +75,8 @@ func NewPeerFlowWorkflowExecution(ctx workflow.Context, state *PeerFlowState) *P // fetchConnectionConfigs fetches the connection configs for source and destination peers. func (w *PeerFlowWorkflowExecution) fetchConnectionConfigs( - ctx workflow.Context) (*protos.FlowConnectionConfigs, error) { + ctx workflow.Context, +) (*protos.FlowConnectionConfigs, error) { w.logger.Info("fetching connection configs for peer flow - ", w.PeerFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -108,7 +109,8 @@ func (w *PeerFlowWorkflowExecution) fetchConnectionConfigs( func (w *PeerFlowWorkflowExecution) getChildWorkflowID( ctx workflow.Context, prefix string, - peerFlowName string) (string, error) { + peerFlowName string, +) (string, error) { childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { return fmt.Sprintf("%s-%s-%s", prefix, peerFlowName, uuid.New().String()) }) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 3c984ae105..ef3aed2442 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -46,7 +46,8 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error { // GetPartitions returns the partitions to replicate. func (q *QRepFlowExecution) GetPartitions(ctx workflow.Context, - last *protos.QRepPartition) (*protos.QRepParitionResult, error) { + last *protos.QRepPartition, +) (*protos.QRepParitionResult, error) { q.logger.Info("fetching partitions to replicate for peer flow - ", q.config.FlowJobName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 56ea1a171f..8068a56169 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -152,7 +152,8 @@ func (s *SetupFlowExecution) createRawTable( // fetchTableSchemaAndSetupNormalizedTables fetches the table schema for the source table and // sets up the normalized tables on the destination peer. func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( - ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs) (*protos.TableSchema, error) { + ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs, +) (*protos.TableSchema, error) { s.logger.Info("fetching table schema for peer flow - ", s.PeerFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index b778292f99..ae0398c518 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -107,7 +107,8 @@ func (s *SyncFlowExecution) executeSyncFlow( // and the checkpoint for the source peer is known. func SyncFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions) (*model.SyncResponse, error) { + options *protos.SyncFlowOptions, +) (*model.SyncResponse, error) { s := NewSyncFlowExecution(ctx, &SyncFlowState{ PeerFlowName: config.FlowJobName, Progress: []string{}, @@ -117,7 +118,8 @@ func SyncFlowWorkflow(ctx workflow.Context, } func NormalizeFlowWorkflow(ctx workflow.Context, - config *protos.FlowConnectionConfigs) (*model.NormalizeResponse, error) { + config *protos.FlowConnectionConfigs, +) (*model.NormalizeResponse, error) { s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ PeerFlowName: config.FlowJobName, Progress: []string{}, @@ -128,7 +130,8 @@ func NormalizeFlowWorkflow(ctx workflow.Context, func (s *NormalizeFlowExecution) executeNormalizeFlow( ctx workflow.Context, - config *protos.FlowConnectionConfigs) (*model.NormalizeResponse, error) { + config *protos.FlowConnectionConfigs, +) (*model.NormalizeResponse, error) { s.logger.Info("executing normalize flow - ", s.PeerFlowName) normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{