diff --git a/flow/.golangci.yml b/flow/.golangci.yml index 2c8032f91d..ab612fb893 100644 --- a/flow/.golangci.yml +++ b/flow/.golangci.yml @@ -4,7 +4,6 @@ run: linters: enable: - dogsled - - dupl - gofumpt - gosec - gosimple @@ -18,9 +17,14 @@ linters: - prealloc - staticcheck - ineffassign + - unparam - unused - lll linters-settings: + stylecheck: + checks: + - all + - '-ST1003' lll: - line-length: 120 + line-length: 144 tab-width: 4 diff --git a/flow/activities/slot.go b/flow/activities/slot.go index 117dbecea3..baa0fbc0fa 100644 --- a/flow/activities/slot.go +++ b/flow/activities/slot.go @@ -23,7 +23,7 @@ func (a *FlowableActivity) handleSlotInfo( return err } - if slotInfo == nil || len(slotInfo) == 0 { + if len(slotInfo) == 0 { slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName)) return nil } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 09185a0fc1..b16034bf20 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -104,7 +104,10 @@ func APIMain(args *APIServerParams) error { } connOptions := client.ConnectionOptions{ - TLS: &tls.Config{Certificates: certs}, + TLS: &tls.Config{ + Certificates: certs, + MinVersion: tls.VersionTLS13, + }, } clientOptions.ConnectionOptions = connOptions } diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 7b03c9de67..dd922ba1f5 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -261,7 +261,9 @@ func (h *FlowRequestHandler) CreateQRepFlow( slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err) } - state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}} + state.LastPartition.Range = &protos.PartitionRange{ + Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}, + } } workflowFn = peerflow.XminFlowWorkflow diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 34f31219ed..f9383d8c5e 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -31,24 +31,24 @@ func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName strin return &pgPeerConfig, nil } -func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) { +func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) { pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName) if err != nil { - return nil, "", err + return nil, err } connStr := utils.GetPGConnectionString(pgPeerConfig) peerPool, err := pgxpool.New(ctx, connStr) if err != nil { - return nil, "", err + return nil, err } - return peerPool, pgPeerConfig.User, nil + return peerPool, nil } func (h *FlowRequestHandler) GetSchemas( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.PeerSchemasResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName) if err != nil { return &protos.PeerSchemasResponse{Schemas: nil}, err } @@ -78,7 +78,7 @@ func (h *FlowRequestHandler) GetTablesInSchema( ctx context.Context, req *protos.SchemaTablesRequest, ) (*protos.SchemaTablesResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName) if err != nil { return &protos.SchemaTablesResponse{Tables: nil}, err } @@ -110,7 +110,7 @@ func (h *FlowRequestHandler) GetAllTables( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.AllTablesResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName) if err != nil { return &protos.AllTablesResponse{Tables: nil}, err } @@ -140,7 +140,7 @@ func (h *FlowRequestHandler) GetColumns( ctx context.Context, req *protos.TableColumnsRequest, ) (*protos.TableColumnsResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName) if err != nil { return &protos.TableColumnsResponse{Columns: nil}, err } diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 16008cc6a5..c68d44d925 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -32,7 +32,10 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { } connOptions := client.ConnectionOptions{ - TLS: &tls.Config{Certificates: certs}, + TLS: &tls.Config{ + Certificates: certs, + MinVersion: tls.VersionTLS13, + }, } clientOptions.ConnectionOptions = connOptions } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index eea0e9184f..f060230b63 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -100,7 +100,10 @@ func WorkerMain(opts *WorkerOptions) error { return fmt.Errorf("unable to process certificate and key: %w", err) } connOptions := client.ConnectionOptions{ - TLS: &tls.Config{Certificates: certs}, + TLS: &tls.Config{ + Certificates: certs, + MinVersion: tls.VersionTLS13, + }, } clientOptions.ConnectionOptions = connOptions } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index b3686f4d09..2be3fcb2a5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -360,7 +360,6 @@ func (p *PostgresCDCSource) consumeStream( p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) rec, err := p.processMessage(records, xld, clientXLogPos) - if err != nil { return fmt.Errorf("error processing message: %w", err) } @@ -470,7 +469,8 @@ func (p *PostgresCDCSource) consumeStream( } func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData, - currentClientXlogPos pglogrepl.LSN) (model.Record, error) { + currentClientXlogPos pglogrepl.LSN, +) (model.Record, error) { logicalMsg, err := pglogrepl.Parse(xld.WALData) if err != nil { return nil, fmt.Errorf("error parsing logical message: %w", err) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index dc604d5631..e48c71b29d 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -84,9 +84,9 @@ type ReplicaIdentityType rune const ( ReplicaIdentityDefault ReplicaIdentityType = 'd' - ReplicaIdentityFull = 'f' - ReplicaIdentityIndex = 'i' - ReplicaIdentityNothing = 'n' + ReplicaIdentityFull ReplicaIdentityType = 'f' + ReplicaIdentityIndex ReplicaIdentityType = 'i' + ReplicaIdentityNothing ReplicaIdentityType = 'n' ) // getRelIDForTable returns the relation ID for a table. diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 46f18aaa3f..f08b66a6c8 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -55,6 +55,7 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue. } } +// nolint:unparam func generateRecords( t *testing.T, nullable bool, diff --git a/flow/connectors/utils/ssh.go b/flow/connectors/utils/ssh.go index 7bd8ed141f..511eea672a 100644 --- a/flow/connectors/utils/ssh.go +++ b/flow/connectors/utils/ssh.go @@ -41,8 +41,9 @@ func GetSSHClientConfig(user, password, privateKeyString string) (*ssh.ClientCon } return &ssh.ClientConfig{ - User: user, - Auth: authMethods, + User: user, + Auth: authMethods, + //nolint:gosec HostKeyCallback: ssh.InsecureIgnoreHostKey(), }, nil } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 3ac7fee713..b3cd9b9c2a 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" ) +// nolint:unparam func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) { err := e2e.CreateTableForQRep(s.pool, s.pgSuffix, tableName) require.NoError(s.t, err)