diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index d3674f5b27..a7538a814e 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -3,7 +3,6 @@ package cmd import ( "context" "fmt" - "github.com/PeerDB-io/peer-flow/alerting" "log/slog" "strconv" "strings" @@ -15,6 +14,7 @@ import ( "go.temporal.io/sdk/client" "google.golang.org/protobuf/proto" + "github.com/PeerDB-io/peer-flow/alerting" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" @@ -26,8 +26,8 @@ type FlowRequestHandler struct { protos.UnimplementedFlowServiceServer temporalClient client.Client pool *pgxpool.Pool - peerflowTaskQueueID string alerter *alerting.Alerter + peerflowTaskQueueID string } func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool, taskQueue string) *FlowRequestHandler { diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 16b04f5133..87e34ad724 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" - "github.com/PeerDB-io/peer-flow/shared/telemetry" "log/slog" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared/telemetry" ) func (h *FlowRequestHandler) ValidateCDCMirror( @@ -29,7 +29,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror( pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig) if err != nil { - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("failed to create postgres connector: %v", err)) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, + fmt.Sprintf("failed to create postgres connector: %v", err), + ) return &protos.ValidateCDCMirrorResponse{ Ok: false, }, fmt.Errorf("failed to create postgres connector: %v", err) @@ -39,7 +41,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror( // Check replication connectivity err = pgPeer.CheckReplicationConnectivity(ctx) if err != nil { - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("unable to establish replication connectivity: %v", err)) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, + fmt.Sprintf("unable to establish replication connectivity: %v", err), + ) return &protos.ValidateCDCMirrorResponse{ Ok: false, }, fmt.Errorf("unable to establish replication connectivity: %v", err) @@ -48,7 +52,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror( // Check permissions of postgres peer err = pgPeer.CheckReplicationPermissions(ctx, sourcePeerConfig.User) if err != nil { - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("failed to check replication permissions: %v", err)) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, + fmt.Sprintf("failed to check replication permissions: %v", err), + ) return &protos.ValidateCDCMirrorResponse{ Ok: false, }, fmt.Errorf("failed to check replication permissions: %v", err) @@ -59,7 +65,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror( for _, tableMapping := range req.ConnectionConfigs.TableMappings { parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier) if parseErr != nil { - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("invalid source table identifier: %s", tableMapping.SourceTableIdentifier)) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, + "invalid source table identifier: "+tableMapping.SourceTableIdentifier, + ) return &protos.ValidateCDCMirrorResponse{ Ok: false, }, fmt.Errorf("invalid source table identifier: %s", tableMapping.SourceTableIdentifier) @@ -72,7 +80,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror( if pubName != "" { err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) if err != nil { - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, fmt.Sprintf("provided source tables invalidated: %v", err)) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, + fmt.Sprintf("provided source tables invalidated: %v", err), + ) return &protos.ValidateCDCMirrorResponse{ Ok: false, }, fmt.Errorf("provided source tables invalidated: %v", err) diff --git a/flow/cmd/validate_peer.go b/flow/cmd/validate_peer.go index 4d92eab6ce..698bc7670c 100644 --- a/flow/cmd/validate_peer.go +++ b/flow/cmd/validate_peer.go @@ -3,13 +3,13 @@ package cmd import ( "context" "fmt" - "github.com/PeerDB-io/peer-flow/shared/telemetry" "log/slog" "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/shared/telemetry" ) func (h *FlowRequestHandler) ValidatePeer( @@ -60,7 +60,9 @@ func (h *FlowRequestHandler) ValidatePeer( validationConn, ok := conn.(connectors.ValidationConnector) if ok { validErr := validationConn.ValidateCheck(ctx) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name, fmt.Sprintf("Failed to validate peer %s: %v", req.Peer.Name, validErr)) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name, + fmt.Sprintf("Failed to validate peer %s: %v", req.Peer.Name, validErr), + ) if validErr != nil { return &protos.ValidatePeerResponse{ Status: protos.ValidatePeerStatus_INVALID, @@ -72,7 +74,9 @@ func (h *FlowRequestHandler) ValidatePeer( connErr := conn.ConnectionActive(ctx) if connErr != nil { - h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name, fmt.Sprintf("Failed to establish peer connection %s: %v", req.Peer.Name, connErr)) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name, + fmt.Sprintf("Failed to establish peer connection %s: %v", req.Peer.Name, connErr), + ) return &protos.ValidatePeerResponse{ Status: protos.ValidatePeerStatus_INVALID, Message: fmt.Sprintf("failed to establish active connection to %s peer %s: %v",