Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow API: Custom resume endpoint, polished mirror status endpoint, auth to all endpoints #1711

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4a252fb
boilerplate
Amogh-Bharadwaj May 13, 2024
0f081ad
check limit after normalise
Amogh-Bharadwaj May 13, 2024
9711c1d
remove log
Amogh-Bharadwaj May 13, 2024
8b1cfd9
lint for ui and go
Amogh-Bharadwaj May 13, 2024
097d418
use flow update config instead
Amogh-Bharadwaj May 13, 2024
8ba424f
fix the wiring
Amogh-Bharadwaj May 14, 2024
65dbf82
refactoring and status endpoint
Amogh-Bharadwaj May 27, 2024
22b472b
lint
Amogh-Bharadwaj May 27, 2024
e21dcde
revert core.go
Amogh-Bharadwaj May 27, 2024
fda6904
change authentication to use password
Amogh-Bharadwaj May 27, 2024
46bf128
suggested changes
Amogh-Bharadwaj May 27, 2024
7bc7151
use existing status endpoint, adapt UI wiring
Amogh-Bharadwaj May 28, 2024
aa0b33e
use bcrypt instead
Amogh-Bharadwaj May 29, 2024
678e1ad
fix import
Amogh-Bharadwaj May 29, 2024
87809a9
remove password from docker
Amogh-Bharadwaj May 29, 2024
f0d9909
lint
Amogh-Bharadwaj May 30, 2024
e5260b0
import fix attempt
Amogh-Bharadwaj May 30, 2024
18883e7
import fix attempt 2
Amogh-Bharadwaj May 30, 2024
b567929
fix lint
iskakaushik May 30, 2024
664cc79
feat(auth): add auth to all flow-api endpoints
iamKunalGupta Jun 6, 2024
adb7f64
feat: add auth interceptor to nexus
iamKunalGupta Jun 6, 2024
84f0f0d
chore(merge): branch 'main' of api/custom-resume-endpoint
iamKunalGupta Jun 6, 2024
f1e5f4b
chore: small function param name change
iamKunalGupta Jun 6, 2024
6f0af22
fix(ui): client side token hash issue
iamKunalGupta Jun 6, 2024
ebdf444
fix(ui): separate token component to hopefully fix build failure
iamKunalGupta Jun 6, 2024
c3c0847
feedback
serprex Jun 6, 2024
682f830
No axios
serprex Jun 6, 2024
f05234e
npm run format
serprex Jun 6, 2024
5666d50
headers go in headers
serprex Jun 6, 2024
7d3c687
Merge branch 'main' into api/custom-resume-endpoint
serprex Jun 6, 2024
1798a85
back to basics (auth) and route fixes
heavycrystal Jun 10, 2024
dabb376
fix review feedback pt.1
heavycrystal Jun 11, 2024
b14d2ae
remove bad logs
heavycrystal Jun 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ services:
- 8113:8113
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
PEERDB_PASSWORD:
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ services:
- 8113:8113
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
PEERDB_PASSWORD:
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
12 changes: 11 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/middleware"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -115,7 +116,16 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
return fmt.Errorf("unable to create Temporal client: %w", err)
}

grpcServer := grpc.NewServer()
healthMethods := []string{
grpc_health_v1.Health_Check_FullMethodName,
grpc_health_v1.Health_Watch_FullMethodName,
}
grpcServer := grpc.NewServer(
grpc.ChainUnaryInterceptor(
middleware.CreateRequestLoggingInterceptor(healthMethods),
middleware.CreateAuthServerInterceptor(ctx, peerdbenv.PeerDBPassword(), healthMethods),
),
)

catalogConn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
Expand Down
109 changes: 109 additions & 0 deletions flow/cmd/custom_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package cmd

import (
"context"
"fmt"
"log/slog"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

const peerdbPauseGuideDocLink = "https://docs.peerdb.io/features/pause-mirror"

func (h *FlowRequestHandler) CustomSyncFlow(
ctx context.Context, req *protos.CreateCustomSyncRequest,
) (*protos.CreateCustomSyncResponse, error) {
errResponse := &protos.CreateCustomSyncResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: "error while processing request",
Ok: false,
}

if req.FlowJobName == "" {
errResponse.ErrorMessage = "Mirror name cannot be empty."
return errResponse, nil
}

if req.NumberOfSyncs <= 0 || req.NumberOfSyncs > peerflow.MaxSyncsPerCdcFlow {
slog.Error("Invalid sync number request",
slog.Any("requested_number_of_syncs", req.NumberOfSyncs))
errResponse.ErrorMessage = fmt.Sprintf("Sync number request must be between 1 and %d (inclusive). Requested number: %d",
peerflow.MaxSyncsPerCdcFlow, req.NumberOfSyncs)
return errResponse, nil
}

mirrorExists, err := h.CheckIfMirrorNameExists(ctx, req.FlowJobName)
if err != nil {
slog.Error("Server error: unable to check if mirror exists", slog.Any("error", err))
errResponse.ErrorMessage = "Server error: unable to check if mirror " + req.FlowJobName + " exists."
return errResponse, nil
}
if !mirrorExists {
slog.Error("Mirror does not exist", slog.Any("mirror_name", req.FlowJobName))
errResponse.ErrorMessage = fmt.Sprintf("Mirror %s does not exist", req.FlowJobName)
return errResponse, nil
}

mirrorStatusResponse, _ := h.MirrorStatus(ctx, &protos.MirrorStatusRequest{
FlowJobName: req.FlowJobName,
})
if mirrorStatusResponse.ErrorMessage != "" {
slog.Error("Server error: unable to check the status of mirror",
slog.Any("mirror", req.FlowJobName),
slog.Any("error", mirrorStatusResponse.ErrorMessage))
errResponse.ErrorMessage = fmt.Sprintf("Server error: unable to check the status of mirror %s: %s",
req.FlowJobName, mirrorStatusResponse.ErrorMessage)
return errResponse, nil
}

if mirrorStatusResponse.CurrentFlowState != protos.FlowStatus_STATUS_PAUSED {
slog.Error("Mirror is not paused", slog.Any("mirror", req.FlowJobName))
errResponse.ErrorMessage = fmt.Sprintf(`Requested mirror %s is not paused. This is a requirement.
The mirror can be paused via PeerDB UI. Please follow %s`,
req.FlowJobName, peerdbPauseGuideDocLink)
return errResponse, nil
}

// Parallel sync-normalise should not be enabled
parallelSyncNormaliseEnabled := peerdbenv.PeerDBEnableParallelSyncNormalize()
if parallelSyncNormaliseEnabled {
errResponse.ErrorMessage = "Parallel sync-normalise is enabled. Please contact PeerDB support to disable it to proceed."
return errResponse, nil
}
// ---- REQUEST VALIDATED ----

// Resume mirror with custom sync number
_, err = h.FlowStateChange(ctx, &protos.FlowStateChangeRequest{
FlowJobName: req.FlowJobName,
RequestedFlowState: protos.FlowStatus_STATUS_RUNNING,
FlowConfigUpdate: &protos.FlowConfigUpdate{
Update: &protos.FlowConfigUpdate_CdcFlowConfigUpdate{
CdcFlowConfigUpdate: &protos.CDCFlowConfigUpdate{
NumberOfSyncs: req.NumberOfSyncs,
},
},
},
})
if err != nil {
slog.Error("Unable to kick off custom sync for mirror",
slog.Any("mirror", req.FlowJobName),
slog.Any("error", err))
errResponse.ErrorMessage = fmt.Sprintf("Unable to kick off sync for mirror %s:%s",
req.FlowJobName, err.Error())
return errResponse, nil
}

slog.Info("Custom sync started for mirror",
slog.String("mirror", req.FlowJobName),
slog.Int("number_of_syncs", int(req.NumberOfSyncs)))

return &protos.CreateCustomSyncResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: req.NumberOfSyncs,
ErrorMessage: "",
Ok: true,
}, nil
}
93 changes: 61 additions & 32 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,87 @@ func (h *FlowRequestHandler) MirrorStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.MirrorStatusResponse, error) {
slog.Info("Mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName))
cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName)
if err != nil {
slog.Error("unable to query flow", slog.Any("error", err))
return &protos.MirrorStatusResponse{
ErrorMessage: "unable to query flow: " + err.Error(),
}, nil
}
slog.Info("Mirror status endpoint called",
slog.Bool("includeFlowInfo", req.IncludeFlowInfo),
slog.String(string(shared.FlowNameKey), req.FlowJobName))

workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to get the workflow ID of mirror " + req.FlowJobName,
Ok: false,
}, nil
}

currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: "unable to get flow state: " + err.Error(),
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to get the running status of mirror " + req.FlowJobName,
Ok: false,
}, nil
}

if cdcFlow {
cdcStatus, err := h.CDCFlowStatus(ctx, req)
if req.IncludeFlowInfo {
cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName)
if err != nil {
slog.Error("unable to query flow", slog.Any("error", err))
return &protos.MirrorStatusResponse{
ErrorMessage: "unable to query flow: " + err.Error(),
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to determine if mirror" + req.FlowJobName + "is of type CDC.",
Ok: false,
}, nil
}
if cdcFlow {
cdcStatus, err := h.CDCFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to obtain CDC information for mirror " + req.FlowJobName,
Ok: false,
}, nil
}

return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
Status: &protos.MirrorStatusResponse_CdcStatus{
CdcStatus: cdcStatus,
},
CurrentFlowState: currState,
}, nil
} else {
qrepStatus, err := h.QRepFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: "unable to query flow: " + err.Error(),
FlowJobName: req.FlowJobName,
Status: &protos.MirrorStatusResponse_CdcStatus{
CdcStatus: cdcStatus,
},
CurrentFlowState: currState,
Ok: true,
}, nil
}
} else {
qrepStatus, err := h.QRepFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to obtain snapshot information for mirror " + req.FlowJobName,
Ok: false,
}, nil
}

return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
Status: &protos.MirrorStatusResponse_QrepStatus{
QrepStatus: qrepStatus,
},
CurrentFlowState: currState,
}, nil
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
Status: &protos.MirrorStatusResponse_QrepStatus{
QrepStatus: qrepStatus,
},
CurrentFlowState: currState,
Ok: true,
}, nil
}
}

return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: currState,
Ok: true,
}, nil
}

func (h *FlowRequestHandler) CDCFlowStatus(
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (h *FlowRequestHandler) CheckIfMirrorNameExists(ctx context.Context, mirror
var nameExists pgtype.Bool
err := h.pool.QueryRow(ctx, "SELECT EXISTS(SELECT * FROM flows WHERE name = $1)", mirrorName).Scan(&nameExists)
if err != nil {
return true, fmt.Errorf("failed to check if mirror name exists: %v", err)
return false, fmt.Errorf("failed to check if mirror name exists: %v", err)
}

return nameExists.Bool, nil
Expand Down
45 changes: 45 additions & 0 deletions flow/middleware/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package middleware

import (
"context"
"encoding/base64"
"log/slog"
"strings"

"golang.org/x/crypto/bcrypt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/PeerDB-io/peer-flow/logger"
)

// Authorize checks the authorization metadata and compares the incoming bearer token with the plaintext
func Authorize(ctx context.Context, plaintext string) (context.Context, error) {
md, _ := metadata.FromIncomingContext(ctx)
if len(md["authorization"]) == 0 {
return nil, status.Errorf(codes.Unauthenticated, "Authorization token is required")
}
headerValue := md["authorization"][0]
bearerPrefix := "Bearer "
if !strings.HasPrefix(headerValue, bearerPrefix) {
return nil, status.Errorf(codes.Unauthenticated, "Unsupported authorization type")
}
base64Token := strings.TrimPrefix(headerValue, bearerPrefix)
if base64Token == "" {
return nil, status.Errorf(codes.Unauthenticated, "Authorization token is required")
}
// Always a good practice to have the actual token in base64
tokenBytes, err := base64.StdEncoding.DecodeString(base64Token)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("Error decoding token", slog.String("token", base64Token), slog.Any("error", err))
return nil, status.Errorf(codes.Unauthenticated, "Authentication failed")
}
token := string(tokenBytes)
err = bcrypt.CompareHashAndPassword([]byte(token), []byte(plaintext))
if err != nil {
logger.LoggerFromCtx(ctx).Warn("Error validating token", slog.String("token", token), slog.Any("error", err))
return nil, status.Errorf(codes.Unauthenticated, "Authentication failed")
}
return ctx, nil
}
Loading