diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 87e34ad724..59fdc37d0a 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -29,35 +29,38 @@ func (h *FlowRequestHandler) ValidateCDCMirror( pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig) if err != nil { + displayErr := fmt.Errorf("failed to create postgres connector: %v", err) h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprintf("failed to create postgres connector: %v", err), + fmt.Sprint(displayErr), ) return &protos.ValidateCDCMirrorResponse{ Ok: false, - }, fmt.Errorf("failed to create postgres connector: %v", err) + }, displayErr } defer pgPeer.Close() // Check replication connectivity err = pgPeer.CheckReplicationConnectivity(ctx) if err != nil { + displayErr := fmt.Errorf("unable to establish replication connectivity: %v", err) h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprintf("unable to establish replication connectivity: %v", err), + fmt.Sprint(displayErr), ) return &protos.ValidateCDCMirrorResponse{ Ok: false, - }, fmt.Errorf("unable to establish replication connectivity: %v", err) + }, displayErr } // Check permissions of postgres peer err = pgPeer.CheckReplicationPermissions(ctx, sourcePeerConfig.User) if err != nil { + displayErr := fmt.Errorf("failed to check replication permissions: %v", err) h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprintf("failed to check replication permissions: %v", err), + fmt.Sprint(displayErr), ) return &protos.ValidateCDCMirrorResponse{ Ok: false, - }, fmt.Errorf("failed to check replication permissions: %v", err) + }, displayErr } // Check source tables @@ -65,12 +68,13 @@ func (h *FlowRequestHandler) ValidateCDCMirror( for _, tableMapping := range req.ConnectionConfigs.TableMappings { parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier) if parseErr != nil { + displayErr := fmt.Errorf("invalid source table identifier: %s", parseErr) h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - "invalid source table identifier: "+tableMapping.SourceTableIdentifier, + fmt.Sprint(displayErr), ) return &protos.ValidateCDCMirrorResponse{ Ok: false, - }, fmt.Errorf("invalid source table identifier: %s", tableMapping.SourceTableIdentifier) + }, displayErr } sourceTables = append(sourceTables, parsedTable) @@ -80,12 +84,13 @@ func (h *FlowRequestHandler) ValidateCDCMirror( if pubName != "" { err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) if err != nil { + displayErr := fmt.Errorf("provided source tables invalidated: %v", err) h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprintf("provided source tables invalidated: %v", err), + fmt.Sprint(displayErr), ) return &protos.ValidateCDCMirrorResponse{ Ok: false, - }, fmt.Errorf("provided source tables invalidated: %v", err) + }, displayErr } } diff --git a/flow/cmd/validate_peer.go b/flow/cmd/validate_peer.go index 698bc7670c..e83fbf305f 100644 --- a/flow/cmd/validate_peer.go +++ b/flow/cmd/validate_peer.go @@ -60,27 +60,27 @@ func (h *FlowRequestHandler) ValidatePeer( validationConn, ok := conn.(connectors.ValidationConnector) if ok { validErr := validationConn.ValidateCheck(ctx) + displayErr := fmt.Sprintf("failed to validate peer %s: %v", req.Peer.Name, validErr) h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name, - fmt.Sprintf("Failed to validate peer %s: %v", req.Peer.Name, validErr), + displayErr, ) if validErr != nil { return &protos.ValidatePeerResponse{ - Status: protos.ValidatePeerStatus_INVALID, - Message: fmt.Sprintf("failed to validate %s peer %s: %v", - req.Peer.Type, req.Peer.Name, validErr), + Status: protos.ValidatePeerStatus_INVALID, + Message: displayErr, }, nil } } connErr := conn.ConnectionActive(ctx) if connErr != nil { + displayErr := fmt.Sprintf("failed to establish active connection to %s peer %s: %v", req.Peer.Type, req.Peer.Name, connErr) h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name, - fmt.Sprintf("Failed to establish peer connection %s: %v", req.Peer.Name, connErr), + displayErr, ) return &protos.ValidatePeerResponse{ - Status: protos.ValidatePeerStatus_INVALID, - Message: fmt.Sprintf("failed to establish active connection to %s peer %s: %v", - req.Peer.Type, req.Peer.Name, connErr), + Status: protos.ValidatePeerStatus_INVALID, + Message: displayErr, }, nil }