Skip to content

Commit

Permalink
Add lints for golang (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jun 7, 2023
1 parent 59b90d1 commit 9096d3c
Show file tree
Hide file tree
Showing 17 changed files with 93 additions and 38 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: GolangCI-Lint

on: [pull_request]

jobs:
golangci-lint:
permissions:
checks: write
contents: read
pull-requests: write
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
with:
submodules: recursive
token: ${{ secrets.SUBMODULE_CHECKOUT }}

- name: golangci-lint
uses: reviewdog/action-golangci-lint@v2
with:
workdir: ./flow
reporter: github-pr-review
github_token: ${{ secrets.GITHUB_TOKEN }}
golangci_lint_flags: "--timeout 10m"
fail_on_error: true
env:
REVIEWDOG_TOKEN: ${{ secrets.REVIEWDOG_TOKEN }}
3 changes: 1 addition & 2 deletions flow/activities/fetch_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ type FetchConfigActivityInput struct {

// FetchConfigActivity is an activity that fetches the config for the specified peer flow.
// This activity is invoked by the PeerFlowWorkflow.
type FetchConfigActivity struct {
}
type FetchConfigActivity struct{}

// FetchConfig retrieves the source and destination config.
func (a *FetchConfigActivity) FetchConfig(
Expand Down
9 changes: 5 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ type IFlowable interface {
}

// FlowableActivity is the activity implementation for IFlowable.
type FlowableActivity struct {
}
type FlowableActivity struct{}

// CheckConnection implements IFlowable.CheckConnection.
func (a *FlowableActivity) CheckConnection(
Expand Down Expand Up @@ -274,7 +273,8 @@ func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *
// GetQRepPartitions returns the partitions for a given QRepConfig.
func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
config *protos.QRepConfig,
last *protos.QRepPartition) (*protos.QRepParitionResult, error) {
last *protos.QRepPartition,
) (*protos.QRepParitionResult, error) {
conn, err := connectors.GetConnector(ctx, config.SourcePeer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -294,7 +294,8 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
// ReplicateQRepPartition replicates a QRepPartition from the source to the destination.
func (a *FlowableActivity) ReplicateQRepPartition(ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition) error {
partition *protos.QRepPartition,
) error {
srcConn, err := connectors.GetConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
Expand Down
16 changes: 9 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

records := make([]StagingBQRecord, 0)

var first = true
first := true
var firstCP int64 = 0
var lastCP int64 = 0

Expand Down Expand Up @@ -520,7 +520,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.GenerateMergeStmts()

//update metadata to make the last normalized batch id to the recent last sync batch id.
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
Expand All @@ -532,9 +532,9 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")

//put this within a transaction
//TODO - not truncating rows in staging table as of now.
//err = c.truncateTable(staging...)
// put this within a transaction
// TODO - not truncating rows in staging table as of now.
// err = c.truncateTable(staging...)

_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
Expand Down Expand Up @@ -642,7 +642,8 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec

// getAppendStagingToRawStmt returns the statement to append the staging table to the raw table.
func (c *BigQueryConnector) getAppendStagingToRawStmt(
rawTableName string, stagingTableName string, stagingBatchID int64) string {
rawTableName string, stagingTableName string, stagingBatchID int64,
) string {
return fmt.Sprintf(
"INSERT INTO %s.%s SELECT _peerdb_uid,_peerdb_timestamp,_peerdb_timestamp_nanos,_peerdb_data,_peerdb_record_type,_peerdb_match_data,_peerdb_batch_id FROM %s.%s WHERE _peerdb_staging_batch_id = %d;",
c.datasetID, rawTableName, c.datasetID, stagingTableName, stagingBatchID)
Expand Down Expand Up @@ -682,7 +683,8 @@ func (c *BigQueryConnector) GetTableSchema(req *protos.GetTableSchemaInput) (*pr
// SetupNormalizedTable sets up a normalized table, implementing the Connector interface.
// This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided.
func (c *BigQueryConnector) SetupNormalizedTable(
req *protos.SetupNormalizedTableInput) (*protos.SetupNormalizedTableOutput, error) {
req *protos.SetupNormalizedTableInput,
) (*protos.SetupNormalizedTableOutput, error) {
// convert the column names and types to bigquery types
sourceSchema := req.SourceTableSchema

Expand Down
12 changes: 8 additions & 4 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,22 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) {
}

func (c *BigQueryConnector) GetQRepPartitions(config *protos.QRepConfig,
last *protos.QRepPartition) ([]*protos.QRepPartition, error) {
last *protos.QRepPartition,
) ([]*protos.QRepPartition, error) {
panic("not implemented")
}

func (c *BigQueryConnector) PullQRepRecords(config *protos.QRepConfig,
partition *protos.QRepPartition) (*model.QRecordBatch, error) {
partition *protos.QRepPartition,
) (*model.QRecordBatch, error) {
panic("not implemented")
}

func (c *BigQueryConnector) SyncQRepRecords(
config *protos.QRepConfig,
partition *protos.QRepPartition,
records *model.QRecordBatch) (int, error) {
records *model.QRecordBatch,
) (int, error) {
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
bqTable := c.client.Dataset(c.datasetID).Table(destTable)
Expand Down Expand Up @@ -201,7 +204,8 @@ func (c *BigQueryConnector) SyncQRepRecords(
func (c *BigQueryConnector) createMetadataInsertStatement(
partition *protos.QRepPartition,
jobName string,
startTime time.Time) (string, error) {
startTime time.Time,
) (string, error) {
// marshal the partition to json using protojson
pbytes, err := protojson.Marshal(partition)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func (p *PostgresCDCSource) consumeStream(
log.Debugf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime)
rec, err := p.processMessage(xld)

if err != nil {
return nil, fmt.Errorf("error processing message: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState
func (c *PostgresConnector) GetLastSyncBatchId(jobName string) (int64, error) {
panic("not implemented")
}

func (c *PostgresConnector) GetLastNormalizeBatchId(jobName string) (int64, error) {
panic("not implemented")
}
Expand Down Expand Up @@ -292,7 +293,8 @@ func (c *PostgresConnector) GetTableSchema(req *protos.GetTableSchemaInput) (*pr

// SetupNormalizedTable sets up a normalized table, implementing the Connector interface.
func (c *PostgresConnector) SetupNormalizedTable(
req *protos.SetupNormalizedTableInput) (*protos.SetupNormalizedTableOutput, error) {
req *protos.SetupNormalizedTableInput,
) (*protos.SetupNormalizedTableOutput, error) {
panic("not implemented")
}

Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
)

func (c *PostgresConnector) GetQRepPartitions(config *protos.QRepConfig,
last *protos.QRepPartition) ([]*protos.QRepPartition, error) {
last *protos.QRepPartition,
) ([]*protos.QRepPartition, error) {
// For the table `config.SourceTableIdentifier`
// Get the min, max value (inclusive) of `config.WatermarkColumn`
extremaQuery := fmt.Sprintf("SELECT MIN(%s), MAX(%s) FROM %s",
Expand Down Expand Up @@ -138,7 +139,8 @@ func mapRowToQRecord(row pgx.Row, fds []pgconn.FieldDescription) (*model.QRecord
}

func (c *PostgresConnector) PullQRepRecords(config *protos.QRepConfig,
partition *protos.QRepPartition) (*model.QRecordBatch, error) {
partition *protos.QRepPartition,
) (*model.QRecordBatch, error) {
var rangeStart interface{}
var rangeEnd interface{}

Expand Down Expand Up @@ -196,7 +198,8 @@ func (c *PostgresConnector) PullQRepRecords(config *protos.QRepConfig,
}

func (c *PostgresConnector) SyncQRepRecords(config *protos.QRepConfig,
partition *protos.QRepPartition, records *model.QRecordBatch) (int, error) {
partition *protos.QRepPartition, records *model.QRecordBatch,
) (int, error) {
return 0, fmt.Errorf("SyncQRepRecords not implemented for postgres connector")
}

Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ import (
)

func (c *SnowflakeConnector) GetQRepPartitions(config *protos.QRepConfig,
last *protos.QRepPartition) ([]*protos.QRepPartition, error) {
last *protos.QRepPartition,
) ([]*protos.QRepPartition, error) {
panic("not implemented")
}

func (c *SnowflakeConnector) PullQRepRecords(config *protos.QRepConfig,
partition *protos.QRepPartition) (*model.QRecordBatch, error) {
partition *protos.QRepPartition,
) (*model.QRecordBatch, error) {
panic("not implemented")
}

func (c *SnowflakeConnector) SyncQRepRecords(
config *protos.QRepConfig,
partition *protos.QRepPartition,
records *model.QRecordBatch) (int, error) {
records *model.QRecordBatch,
) (int, error) {
panic("not implemented")
}

Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ func (c *SnowflakeConnector) GetTableSchema(req *protos.GetTableSchemaInput) (*p
}

func (c *SnowflakeConnector) SetupNormalizedTable(
req *protos.SetupNormalizedTableInput) (*protos.SetupNormalizedTableOutput, error) {
req *protos.SetupNormalizedTableInput,
) (*protos.SetupNormalizedTableOutput, error) {
normalizedTableInfo, err := parseTableInfo(req.TableIdentifier, req.SourceTableSchema)
if err != nil {
return nil, fmt.Errorf("error while parsing table schema and name: %w", err)
Expand Down Expand Up @@ -545,7 +546,8 @@ func getRawTableIdentifier(jobName string, tableIdentifier string) string {
}

func (c *SnowflakeConnector) insertRecordsInRawTable(schemaIdentifier string, rawTableIdentifier string,
snowflakeRawRecords []snowflakeRawRecord, syncRecordsTx *sql.Tx) error {
snowflakeRawRecords []snowflakeRawRecord, syncRecordsTx *sql.Tx,
) error {
rawRecordsData := make([]any, 0)

for _, record := range snowflakeRawRecords {
Expand All @@ -560,7 +562,8 @@ func (c *SnowflakeConnector) insertRecordsInRawTable(schemaIdentifier string, ra
}

func (c *SnowflakeConnector) generateAndExecuteMergeStatement(sourceTableInfo *tableInfo, destinationTableIdentifier string,
rawTableIdentifier string, syncBatchID int64, normalizeBatchID int64, normalizeRecordsTx *sql.Tx) error {
rawTableIdentifier string, syncBatchID int64, normalizeBatchID int64, normalizeRecordsTx *sql.Tx,
) error {
normalizedTableSchema := sourceTableInfo.tableSchema
// TODO: switch this to function maps.Keys when it is moved into Go's stdlib
columnNames := make([]string, 0, len(normalizedTableSchema.Columns))
Expand Down
3 changes: 2 additions & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto

// GenerateQRepConfig generates a qrep config for testing.
func (c *FlowConnectionGenerationConfig) GenerateQRepConfig(
query string, watermark string) (*protos.QRepConfig, error) {
query string, watermark string,
) (*protos.QRepConfig, error) {
ret := &protos.QRepConfig{}

ret.FlowJobName = c.FlowJobName
Expand Down
1 change: 1 addition & 0 deletions flow/e2e/peer_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (s *E2EPeerFlowTestSuite) setupBigQuery() error {
// Implement SetupAllSuite interface to setup the test suite
func (s *E2EPeerFlowTestSuite) SetupSuite() {
// seed the random number generator with current time

rand.Seed(time.Now().UnixNano())

err := s.setupPostgres()
Expand Down
6 changes: 4 additions & 2 deletions flow/workflows/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ package peerflow

import "github.com/PeerDB-io/peer-flow/activities"

var fetchConfig *activities.FetchConfigActivity
var flowable *activities.FlowableActivity
var (
fetchConfig *activities.FetchConfigActivity
flowable *activities.FlowableActivity
)
6 changes: 4 additions & 2 deletions flow/workflows/peer_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func NewPeerFlowWorkflowExecution(ctx workflow.Context, state *PeerFlowState) *P

// fetchConnectionConfigs fetches the connection configs for source and destination peers.
func (w *PeerFlowWorkflowExecution) fetchConnectionConfigs(
ctx workflow.Context) (*protos.FlowConnectionConfigs, error) {
ctx workflow.Context,
) (*protos.FlowConnectionConfigs, error) {
w.logger.Info("fetching connection configs for peer flow - ", w.PeerFlowName)

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down Expand Up @@ -108,7 +109,8 @@ func (w *PeerFlowWorkflowExecution) fetchConnectionConfigs(
func (w *PeerFlowWorkflowExecution) getChildWorkflowID(
ctx workflow.Context,
prefix string,
peerFlowName string) (string, error) {
peerFlowName string,
) (string, error) {
childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return fmt.Sprintf("%s-%s-%s", prefix, peerFlowName, uuid.New().String())
})
Expand Down
3 changes: 2 additions & 1 deletion flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error {

// GetPartitions returns the partitions to replicate.
func (q *QRepFlowExecution) GetPartitions(ctx workflow.Context,
last *protos.QRepPartition) (*protos.QRepParitionResult, error) {
last *protos.QRepPartition,
) (*protos.QRepParitionResult, error) {
q.logger.Info("fetching partitions to replicate for peer flow - ", q.config.FlowJobName)

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down
3 changes: 2 additions & 1 deletion flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (s *SetupFlowExecution) createRawTable(
// fetchTableSchemaAndSetupNormalizedTables fetches the table schema for the source table and
// sets up the normalized tables on the destination peer.
func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs) (*protos.TableSchema, error) {
ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs,
) (*protos.TableSchema, error) {
s.logger.Info("fetching table schema for peer flow - ", s.PeerFlowName)

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down
9 changes: 6 additions & 3 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func (s *SyncFlowExecution) executeSyncFlow(
// and the checkpoint for the source peer is known.
func SyncFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions) (*model.SyncResponse, error) {
options *protos.SyncFlowOptions,
) (*model.SyncResponse, error) {
s := NewSyncFlowExecution(ctx, &SyncFlowState{
PeerFlowName: config.FlowJobName,
Progress: []string{},
Expand All @@ -117,7 +118,8 @@ func SyncFlowWorkflow(ctx workflow.Context,
}

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs) (*model.NormalizeResponse, error) {
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
PeerFlowName: config.FlowJobName,
Progress: []string{},
Expand All @@ -128,7 +130,8 @@ func NormalizeFlowWorkflow(ctx workflow.Context,

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs) (*model.NormalizeResponse, error) {
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.PeerFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down

0 comments on commit 9096d3c

Please sign in to comment.