Skip to content

Commit

Permalink
Merge branch 'main' into update-dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 26, 2023
2 parents 8ffe69c + eb63a76 commit dc0b1ac
Show file tree
Hide file tree
Showing 20 changed files with 288 additions and 169 deletions.
8 changes: 6 additions & 2 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ run:
linters:
enable:
- dogsled
- dupl
- gofumpt
- gosec
- gosimple
Expand All @@ -18,9 +17,14 @@ linters:
- prealloc
- staticcheck
- ineffassign
- unparam
- unused
- lll
linters-settings:
stylecheck:
checks:
- all
- '-ST1003'
lll:
line-length: 120
line-length: 144
tab-width: 4
2 changes: 1 addition & 1 deletion flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (a *FlowableActivity) handleSlotInfo(
return err
}

if slotInfo == nil || len(slotInfo) == 0 {
if len(slotInfo) == 0 {
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func APIMain(args *APIServerParams) error {
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
4 changes: 3 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err)
}
state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}}
state.LastPartition.Range = &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}},
}
}

workflowFn = peerflow.XminFlowWorkflow
Expand Down
16 changes: 8 additions & 8 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,24 @@ func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName strin
return &pgPeerConfig, nil
}

func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) {
func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) {
pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName)
if err != nil {
return nil, "", err
return nil, err
}
connStr := utils.GetPGConnectionString(pgPeerConfig)
peerPool, err := pgxpool.New(ctx, connStr)
if err != nil {
return nil, "", err
return nil, err
}
return peerPool, pgPeerConfig.User, nil
return peerPool, nil
}

func (h *FlowRequestHandler) GetSchemas(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSchemasResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (h *FlowRequestHandler) GetTablesInSchema(
ctx context.Context,
req *protos.SchemaTablesRequest,
) (*protos.SchemaTablesResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (h *FlowRequestHandler) GetAllTables(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.AllTablesResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (h *FlowRequestHandler) GetColumns(
ctx context.Context,
req *protos.TableColumnsRequest,
) (*protos.TableColumnsResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
}
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func WorkerMain(opts *WorkerOptions) error {
return fmt.Errorf("unable to process certificate and key: %w", err)
}
connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
Loading

0 comments on commit dc0b1ac

Please sign in to comment.