Skip to content

Commit

Permalink
feat: deduplicate error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Apr 25, 2024
1 parent adc17a9 commit ce5ace7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
25 changes: 15 additions & 10 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,48 +29,52 @@ func (h *FlowRequestHandler) ValidateCDCMirror(

pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig)
if err != nil {
displayErr := fmt.Errorf("failed to create postgres connector: %v", err)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprintf("failed to create postgres connector: %v", err),
fmt.Sprint(displayErr),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("failed to create postgres connector: %v", err)
}, displayErr
}
defer pgPeer.Close()

// Check replication connectivity
err = pgPeer.CheckReplicationConnectivity(ctx)
if err != nil {
displayErr := fmt.Errorf("unable to establish replication connectivity: %v", err)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprintf("unable to establish replication connectivity: %v", err),
fmt.Sprint(displayErr),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("unable to establish replication connectivity: %v", err)
}, displayErr
}

// Check permissions of postgres peer
err = pgPeer.CheckReplicationPermissions(ctx, sourcePeerConfig.User)
if err != nil {
displayErr := fmt.Errorf("failed to check replication permissions: %v", err)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprintf("failed to check replication permissions: %v", err),
fmt.Sprint(displayErr),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("failed to check replication permissions: %v", err)
}, displayErr
}

// Check source tables
sourceTables := make([]*utils.SchemaTable, 0, len(req.ConnectionConfigs.TableMappings))
for _, tableMapping := range req.ConnectionConfigs.TableMappings {
parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier)
if parseErr != nil {
displayErr := fmt.Errorf("invalid source table identifier: %s", parseErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
"invalid source table identifier: "+tableMapping.SourceTableIdentifier,
fmt.Sprint(displayErr),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("invalid source table identifier: %s", tableMapping.SourceTableIdentifier)
}, displayErr
}

sourceTables = append(sourceTables, parsedTable)
Expand All @@ -80,12 +84,13 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
if pubName != "" {
err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName)
if err != nil {
displayErr := fmt.Errorf("provided source tables invalidated: %v", err)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprintf("provided source tables invalidated: %v", err),
fmt.Sprint(displayErr),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("provided source tables invalidated: %v", err)
}, displayErr
}
}

Expand Down
16 changes: 8 additions & 8 deletions flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,27 @@ func (h *FlowRequestHandler) ValidatePeer(
validationConn, ok := conn.(connectors.ValidationConnector)
if ok {
validErr := validationConn.ValidateCheck(ctx)
displayErr := fmt.Sprintf("failed to validate peer %s: %v", req.Peer.Name, validErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
fmt.Sprintf("Failed to validate peer %s: %v", req.Peer.Name, validErr),
displayErr,
)
if validErr != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to validate %s peer %s: %v",
req.Peer.Type, req.Peer.Name, validErr),
Status: protos.ValidatePeerStatus_INVALID,
Message: displayErr,
}, nil
}
}

connErr := conn.ConnectionActive(ctx)
if connErr != nil {
displayErr := fmt.Sprintf("failed to establish active connection to %s peer %s: %v", req.Peer.Type, req.Peer.Name, connErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
fmt.Sprintf("Failed to establish peer connection %s: %v", req.Peer.Name, connErr),
displayErr,
)
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish active connection to %s peer %s: %v",
req.Peer.Type, req.Peer.Name, connErr),
Status: protos.ValidatePeerStatus_INVALID,
Message: displayErr,
}, nil
}

Expand Down

0 comments on commit ce5ace7

Please sign in to comment.