Skip to content

Commit

Permalink
chore: lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Apr 25, 2024
1 parent cc07458 commit adc17a9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
4 changes: 2 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cmd
import (
"context"
"fmt"
"github.com/PeerDB-io/peer-flow/alerting"
"log/slog"
"strconv"
"strings"
Expand All @@ -15,6 +14,7 @@ import (
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -26,8 +26,8 @@ type FlowRequestHandler struct {
protos.UnimplementedFlowServiceServer
temporalClient client.Client
pool *pgxpool.Pool
peerflowTaskQueueID string
alerter *alerting.Alerter
peerflowTaskQueueID string
}

func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool, taskQueue string) *FlowRequestHandler {
Expand Down
22 changes: 16 additions & 6 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"errors"
"fmt"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
"log/slog"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
)

func (h *FlowRequestHandler) ValidateCDCMirror(
Expand All @@ -29,7 +29,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror(

pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig)
if err != nil {
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("failed to create postgres connector: %v", err))
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprintf("failed to create postgres connector: %v", err),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("failed to create postgres connector: %v", err)
Expand All @@ -39,7 +41,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
// Check replication connectivity
err = pgPeer.CheckReplicationConnectivity(ctx)
if err != nil {
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("unable to establish replication connectivity: %v", err))
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprintf("unable to establish replication connectivity: %v", err),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("unable to establish replication connectivity: %v", err)
Expand All @@ -48,7 +52,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
// Check permissions of postgres peer
err = pgPeer.CheckReplicationPermissions(ctx, sourcePeerConfig.User)
if err != nil {
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("failed to check replication permissions: %v", err))
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprintf("failed to check replication permissions: %v", err),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("failed to check replication permissions: %v", err)
Expand All @@ -59,7 +65,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
for _, tableMapping := range req.ConnectionConfigs.TableMappings {
parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier)
if parseErr != nil {
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("invalid source table identifier: %s", tableMapping.SourceTableIdentifier))
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
"invalid source table identifier: "+tableMapping.SourceTableIdentifier,
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("invalid source table identifier: %s", tableMapping.SourceTableIdentifier)
Expand All @@ -72,7 +80,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
if pubName != "" {
err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName)
if err != nil {
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("provided source tables invalidated: %v", err))
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprintf("provided source tables invalidated: %v", err),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("provided source tables invalidated: %v", err)
Expand Down
10 changes: 7 additions & 3 deletions flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package cmd
import (
"context"
"fmt"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
"log/slog"

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
)

func (h *FlowRequestHandler) ValidatePeer(
Expand Down Expand Up @@ -60,7 +60,9 @@ func (h *FlowRequestHandler) ValidatePeer(
validationConn, ok := conn.(connectors.ValidationConnector)
if ok {
validErr := validationConn.ValidateCheck(ctx)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name, fmt.Sprintf("Failed to validate peer %s: %v", req.Peer.Name, validErr))
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
fmt.Sprintf("Failed to validate peer %s: %v", req.Peer.Name, validErr),
)
if validErr != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Expand All @@ -72,7 +74,9 @@ func (h *FlowRequestHandler) ValidatePeer(

connErr := conn.ConnectionActive(ctx)
if connErr != nil {
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name, fmt.Sprintf("Failed to establish peer connection %s: %v", req.Peer.Name, connErr))
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
fmt.Sprintf("Failed to establish peer connection %s: %v", req.Peer.Name, connErr),
)
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish active connection to %s peer %s: %v",
Expand Down

0 comments on commit adc17a9

Please sign in to comment.