diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 1868c755bf..2f1820d28f 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -168,7 +168,6 @@ services: environment: <<: *catalog-config PEERDB_LOG_DIR: /var/log/peerdb - PEERDB_PASSWORD: peerdb PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112 RUST_LOG: info RUST_BACKTRACE: 1 @@ -189,7 +188,6 @@ services: <<: *catalog-config DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113 - PEERDB_PASSWORD: NEXTAUTH_SECRET: __changeme__ NEXTAUTH_URL: http://localhost:3000 depends_on: diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 5b010916db..50910bef4f 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -23,6 +23,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/middleware" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -115,7 +116,20 @@ func APIMain(ctx context.Context, args *APIServerParams) error { return fmt.Errorf("unable to create Temporal client: %w", err) } - grpcServer := grpc.NewServer() + healthMethods := []string{ + grpc_health_v1.Health_Check_FullMethodName, + grpc_health_v1.Health_Watch_FullMethodName, + } + authInterceptor, err := middleware.CreateAuthServerInterceptor(ctx, healthMethods) + if err != nil { + return fmt.Errorf("unable to create auth middleware: %w", err) + } + grpcServer := grpc.NewServer( + grpc.ChainUnaryInterceptor( + middleware.CreateRequestLoggingInterceptor(healthMethods), + authInterceptor, + ), + ) catalogConn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { diff --git a/flow/cmd/custom_sync.go b/flow/cmd/custom_sync.go new file mode 100644 index 0000000000..27a7d97e47 --- /dev/null +++ b/flow/cmd/custom_sync.go @@ -0,0 +1,109 @@ +package cmd + +import ( + "context" + "fmt" + "log/slog" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +const peerdbPauseGuideDocLink = "https://docs.peerdb.io/features/pause-mirror" + +func (h *FlowRequestHandler) CustomSyncFlow( + ctx context.Context, req *protos.CreateCustomSyncRequest, +) (*protos.CreateCustomSyncResponse, error) { + errResponse := &protos.CreateCustomSyncResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: 0, + ErrorMessage: "error while processing request", + Ok: false, + } + + if req.FlowJobName == "" { + errResponse.ErrorMessage = "Mirror name cannot be empty." + return errResponse, nil + } + + if req.NumberOfSyncs <= 0 || req.NumberOfSyncs > peerflow.MaxSyncsPerCdcFlow { + slog.Error("Invalid sync number request", + slog.Any("requested_number_of_syncs", req.NumberOfSyncs)) + errResponse.ErrorMessage = fmt.Sprintf("Sync number request must be between 1 and %d (inclusive). Requested number: %d", + peerflow.MaxSyncsPerCdcFlow, req.NumberOfSyncs) + return errResponse, nil + } + + mirrorExists, err := h.CheckIfMirrorNameExists(ctx, req.FlowJobName) + if err != nil { + slog.Error("Server error: unable to check if mirror exists", slog.Any("error", err)) + errResponse.ErrorMessage = "Server error: unable to check if mirror " + req.FlowJobName + " exists." + return errResponse, nil + } + if !mirrorExists { + slog.Error("Mirror does not exist", slog.Any("mirror_name", req.FlowJobName)) + errResponse.ErrorMessage = fmt.Sprintf("Mirror %s does not exist", req.FlowJobName) + return errResponse, nil + } + + mirrorStatusResponse, _ := h.MirrorStatus(ctx, &protos.MirrorStatusRequest{ + FlowJobName: req.FlowJobName, + }) + if mirrorStatusResponse.ErrorMessage != "" { + slog.Error("Server error: unable to check the status of mirror", + slog.Any("mirror", req.FlowJobName), + slog.Any("error", mirrorStatusResponse.ErrorMessage)) + errResponse.ErrorMessage = fmt.Sprintf("Server error: unable to check the status of mirror %s: %s", + req.FlowJobName, mirrorStatusResponse.ErrorMessage) + return errResponse, nil + } + + if mirrorStatusResponse.CurrentFlowState != protos.FlowStatus_STATUS_PAUSED { + slog.Error("Mirror is not paused", slog.Any("mirror", req.FlowJobName)) + errResponse.ErrorMessage = fmt.Sprintf(`Requested mirror %s is not paused. This is a requirement. + The mirror can be paused via PeerDB UI. Please follow %s`, + req.FlowJobName, peerdbPauseGuideDocLink) + return errResponse, nil + } + + // Parallel sync-normalise should not be enabled + parallelSyncNormaliseEnabled := peerdbenv.PeerDBEnableParallelSyncNormalize() + if parallelSyncNormaliseEnabled { + errResponse.ErrorMessage = "Parallel sync-normalise is enabled. Please contact PeerDB support to disable it to proceed." + return errResponse, nil + } + // ---- REQUEST VALIDATED ---- + + // Resume mirror with custom sync number + _, err = h.FlowStateChange(ctx, &protos.FlowStateChangeRequest{ + FlowJobName: req.FlowJobName, + RequestedFlowState: protos.FlowStatus_STATUS_RUNNING, + FlowConfigUpdate: &protos.FlowConfigUpdate{ + Update: &protos.FlowConfigUpdate_CdcFlowConfigUpdate{ + CdcFlowConfigUpdate: &protos.CDCFlowConfigUpdate{ + NumberOfSyncs: req.NumberOfSyncs, + }, + }, + }, + }) + if err != nil { + slog.Error("Unable to kick off custom sync for mirror", + slog.Any("mirror", req.FlowJobName), + slog.Any("error", err)) + errResponse.ErrorMessage = fmt.Sprintf("Unable to kick off sync for mirror %s:%s", + req.FlowJobName, err.Error()) + return errResponse, nil + } + + slog.Info("Custom sync started for mirror", + slog.String("mirror", req.FlowJobName), + slog.Int("number_of_syncs", int(req.NumberOfSyncs))) + + return &protos.CreateCustomSyncResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: req.NumberOfSyncs, + ErrorMessage: "", + Ok: true, + }, nil +} diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 06f64d34a9..411f855942 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -19,58 +19,87 @@ func (h *FlowRequestHandler) MirrorStatus( ctx context.Context, req *protos.MirrorStatusRequest, ) (*protos.MirrorStatusResponse, error) { - slog.Info("Mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName)) - cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName) - if err != nil { - slog.Error("unable to query flow", slog.Any("error", err)) - return &protos.MirrorStatusResponse{ - ErrorMessage: "unable to query flow: " + err.Error(), - }, nil - } + slog.Info("Mirror status endpoint called", + slog.Bool("includeFlowInfo", req.IncludeFlowInfo), + slog.String(string(shared.FlowNameKey), req.FlowJobName)) workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) if err != nil { - return nil, err + return &protos.MirrorStatusResponse{ + FlowJobName: req.FlowJobName, + CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, + ErrorMessage: "unable to get the workflow ID of mirror " + req.FlowJobName, + Ok: false, + }, nil } currState, err := h.getWorkflowStatus(ctx, workflowID) if err != nil { return &protos.MirrorStatusResponse{ - ErrorMessage: "unable to get flow state: " + err.Error(), + FlowJobName: req.FlowJobName, + CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, + ErrorMessage: "unable to get the running status of mirror " + req.FlowJobName, + Ok: false, }, nil } - if cdcFlow { - cdcStatus, err := h.CDCFlowStatus(ctx, req) + if req.IncludeFlowInfo { + cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName) if err != nil { + slog.Error("unable to query flow", slog.Any("error", err)) return &protos.MirrorStatusResponse{ - ErrorMessage: "unable to query flow: " + err.Error(), + FlowJobName: req.FlowJobName, + CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, + ErrorMessage: "unable to determine if mirror" + req.FlowJobName + "is of type CDC.", + Ok: false, }, nil } + if cdcFlow { + cdcStatus, err := h.CDCFlowStatus(ctx, req) + if err != nil { + return &protos.MirrorStatusResponse{ + FlowJobName: req.FlowJobName, + CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, + ErrorMessage: "unable to obtain CDC information for mirror " + req.FlowJobName, + Ok: false, + }, nil + } - return &protos.MirrorStatusResponse{ - FlowJobName: req.FlowJobName, - Status: &protos.MirrorStatusResponse_CdcStatus{ - CdcStatus: cdcStatus, - }, - CurrentFlowState: currState, - }, nil - } else { - qrepStatus, err := h.QRepFlowStatus(ctx, req) - if err != nil { return &protos.MirrorStatusResponse{ - ErrorMessage: "unable to query flow: " + err.Error(), + FlowJobName: req.FlowJobName, + Status: &protos.MirrorStatusResponse_CdcStatus{ + CdcStatus: cdcStatus, + }, + CurrentFlowState: currState, + Ok: true, }, nil - } + } else { + qrepStatus, err := h.QRepFlowStatus(ctx, req) + if err != nil { + return &protos.MirrorStatusResponse{ + FlowJobName: req.FlowJobName, + CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, + ErrorMessage: "unable to obtain snapshot information for mirror " + req.FlowJobName, + Ok: false, + }, nil + } - return &protos.MirrorStatusResponse{ - FlowJobName: req.FlowJobName, - Status: &protos.MirrorStatusResponse_QrepStatus{ - QrepStatus: qrepStatus, - }, - CurrentFlowState: currState, - }, nil + return &protos.MirrorStatusResponse{ + FlowJobName: req.FlowJobName, + Status: &protos.MirrorStatusResponse_QrepStatus{ + QrepStatus: qrepStatus, + }, + CurrentFlowState: currState, + Ok: true, + }, nil + } } + + return &protos.MirrorStatusResponse{ + FlowJobName: req.FlowJobName, + CurrentFlowState: currState, + Ok: true, + }, nil } func (h *FlowRequestHandler) CDCFlowStatus( diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index b4304b0504..f67a4f1d40 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -127,7 +127,7 @@ func (h *FlowRequestHandler) CheckIfMirrorNameExists(ctx context.Context, mirror var nameExists pgtype.Bool err := h.pool.QueryRow(ctx, "SELECT EXISTS(SELECT * FROM flows WHERE name = $1)", mirrorName).Scan(&nameExists) if err != nil { - return true, fmt.Errorf("failed to check if mirror name exists: %v", err) + return false, fmt.Errorf("failed to check if mirror name exists: %v", err) } return nameExists.Bool, nil diff --git a/flow/middleware/auth.go b/flow/middleware/auth.go new file mode 100644 index 0000000000..0f26eafd36 --- /dev/null +++ b/flow/middleware/auth.go @@ -0,0 +1,42 @@ +package middleware + +import ( + "context" + "encoding/base64" + "log/slog" + "strings" + + "golang.org/x/crypto/bcrypt" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/PeerDB-io/peer-flow/logger" +) + +// authorize checks the authorization metadata and compares the incoming bearer token with the plaintext +func authorize(ctx context.Context, hash []byte) (context.Context, error) { + md, _ := metadata.FromIncomingContext(ctx) + if len(md["authorization"]) == 0 { + return nil, status.Errorf(codes.Unauthenticated, "Authorization token is required") + } + headerValue := md["authorization"][0] + base64Token, hasPrefix := strings.CutPrefix(headerValue, "Bearer ") + if !hasPrefix { + return nil, status.Errorf(codes.Unauthenticated, "Unsupported authorization type") + } else if base64Token == "" { + return nil, status.Errorf(codes.Unauthenticated, "Authorization token is required") + } + // Always a good practice to have the actual token in base64 + tokenBytes, err := base64.StdEncoding.DecodeString(base64Token) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("Error decoding token", slog.Any("error", err)) + return nil, status.Errorf(codes.Unauthenticated, "Authentication failed") + } + + if err := bcrypt.CompareHashAndPassword(hash, tokenBytes); err != nil { + logger.LoggerFromCtx(ctx).Warn("Error validating token", slog.Any("error", err)) + return nil, status.Errorf(codes.Unauthenticated, "Authentication failed") + } + return ctx, nil +} diff --git a/flow/middleware/interceptors.go b/flow/middleware/interceptors.go new file mode 100644 index 0000000000..4df174d1d3 --- /dev/null +++ b/flow/middleware/interceptors.go @@ -0,0 +1,90 @@ +package middleware + +import ( + "context" + "log/slog" + "time" + + "golang.org/x/crypto/bcrypt" + "google.golang.org/grpc" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + + "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +func CreateAuthServerInterceptor(ctx context.Context, unauthenticatedMethods []string) (grpc.UnaryServerInterceptor, error) { + plaintext := peerdbenv.PeerDBPassword() + + if plaintext == "" { + logger.LoggerFromCtx(ctx).Warn("Authentication is disabled") + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + return handler(ctx, req) + }, nil + } + + unauthenticatedMethodsSet := make(map[string]struct{}) + for _, method := range unauthenticatedMethods { + unauthenticatedMethodsSet[method] = struct{}{} + } + + hash, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost) + if err != nil { + return nil, err + } + + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if _, ok := unauthenticatedMethodsSet[info.FullMethod]; ok { + return handler(ctx, req) + } + ctx, err := authorize(ctx, hash) + if err != nil { + return nil, err + } + return handler(ctx, req) + }, nil +} + +// CreateRequestLoggingInterceptor logs all requests +// this is important for monitoring, debugging and auditing +func CreateRequestLoggingInterceptor(ignoredMethods []string) grpc.UnaryServerInterceptor { + ignoredMethodsSet := make(map[string]struct{}) + for _, method := range ignoredMethods { + ignoredMethodsSet[method] = struct{}{} + } + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if _, ok := ignoredMethodsSet[info.FullMethod]; ok { + return handler(ctx, req) + } + start := time.Now() + logger.LoggerFromCtx(ctx).Info( + "Received request", + slog.String("grpc.method", info.FullMethod), + ) + resp, err := handler(ctx, req) + var errorCode string + if err != nil { + // if error is a grpc error, extract the error code + if grpcErr, ok := status.FromError(err); ok { + errorCode = grpcErr.Code().String() + } + } + // TODO maybe also look at x-forwarded-for ? + var clientIp string + if p, ok := peer.FromContext(ctx); ok { + clientIp = p.Addr.String() + } + + logger.LoggerFromCtx(ctx).Info( + "Request completed", + slog.String("grpc.method", info.FullMethod), + slog.Duration("duration", time.Since(start)), + slog.Float64("duration_seconds", time.Since(start).Seconds()), + slog.Any("error", err), + slog.String("grpc.code", errorCode), + slog.String("client_ip", clientIp), + ) + return resp, err + } +} diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index bcf1ac050e..7e844b33d8 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -21,6 +21,11 @@ func PeerDBDeploymentUID() string { return GetEnvString("PEERDB_DEPLOYMENT_UID", "") } +// PEERDB_PASSWORD: use carefully, this is a secret +func PeerDBPassword() string { + return GetEnvString("PEERDB_PASSWORD", "") +} + func PeerFlowTaskQueueName(taskQueueID shared.TaskQueueID) string { deploymentUID := PeerDBDeploymentUID() if deploymentUID == "" { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 51ccaf7f5f..c44587bb2a 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -47,6 +47,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow BatchSize: cfg.MaxBatchSize, IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, TableMappings: tableMappings, + NumberOfSyncs: 0, }, } } @@ -82,7 +83,7 @@ func GetChildWorkflowID( type CDCFlowWorkflowResult = CDCFlowWorkflowState const ( - maxSyncsPerCdcFlow = 32 + MaxSyncsPerCdcFlow = 32 ) func processCDCFlowConfigUpdate( @@ -172,13 +173,16 @@ func addCdcPropertiesSignalListener( if cdcConfigUpdate.IdleTimeout > 0 { state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout } + if cdcConfigUpdate.NumberOfSyncs > 0 { + state.SyncFlowOptions.NumberOfSyncs = cdcConfigUpdate.NumberOfSyncs + } // do this irrespective of additional tables being present, for auto unpausing state.FlowConfigUpdate = cdcConfigUpdate - logger.Info("CDC Signal received. Parameters on signal reception:", slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)), - slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) + slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables), + slog.Int("NumberOfSyncs", int(state.SyncFlowOptions.NumberOfSyncs))) }) } @@ -225,7 +229,6 @@ func CDCFlowWorkflow( logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cfg.FlowJobName)) flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) - err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) { return *state, nil }) @@ -250,6 +253,7 @@ func CDCFlowWorkflow( shared.MirrorNameSearchAttribute: cfg.FlowJobName, } + var syncCountLimit int if state.ActiveSignal == model.PauseSignal { selector := workflow.NewNamedSelector(ctx, "PauseLoop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) @@ -257,7 +261,6 @@ func CDCFlowWorkflow( state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger) }) addCdcPropertiesSignalListener(ctx, logger, selector, state) - startTime := workflow.Now(ctx) state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED @@ -283,6 +286,7 @@ func CDCFlowWorkflow( if err != nil { return state, err } + syncCountLimit = int(state.SyncFlowOptions.NumberOfSyncs) logger.Info("wiping flow state after state update processing") // finished processing, wipe it state.FlowConfigUpdate = nil @@ -411,7 +415,6 @@ func CDCFlowWorkflow( var restart, finished bool syncCount := 0 - syncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -509,6 +512,12 @@ func CDCFlowWorkflow( normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) normDoneChan.Drain() normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { + if syncCount == syncCountLimit { + logger.Info("sync count limit reached, pausing", + slog.Int("limit", syncCountLimit), + slog.Int("count", syncCount)) + state.ActiveSignal = model.PauseSignal + } if syncFlowFuture != nil { _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil) } @@ -528,7 +537,7 @@ func CDCFlowWorkflow( return state, err } - if state.ActiveSignal == model.PauseSignal || syncCount >= maxSyncsPerCdcFlow { + if state.ActiveSignal == model.PauseSignal || syncCount >= MaxSyncsPerCdcFlow { restart = true if syncFlowFuture != nil { err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil) diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index f1d56e39ba..12c5dc5f89 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -986,6 +986,7 @@ name = "flow-rs" version = "0.1.0" dependencies = [ "anyhow", + "base64 0.22.1", "catalog", "pt", "serde_json", diff --git a/nexus/flow-rs/Cargo.toml b/nexus/flow-rs/Cargo.toml index 5ae9cda2d9..69596eba19 100644 --- a/nexus/flow-rs/Cargo.toml +++ b/nexus/flow-rs/Cargo.toml @@ -10,3 +10,4 @@ tracing.workspace = true tonic-health = "0.11" pt = { path = "../pt" } catalog = { path = "../catalog" } +base64 = "0.22.1" diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index a07115c1ec..0957d6be5c 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -1,11 +1,15 @@ +use std::str::FromStr; + +use base64::Engine; +use serde_json::Value; +use tonic_health::pb::health_client; + use catalog::WorkflowDetails; use pt::{ flow_model::{FlowJob, QRepFlowJob}, peerdb_flow::{QRepWriteMode, QRepWriteType, TypeSystem}, peerdb_route, tonic, }; -use serde_json::Value; -use tonic_health::pb::health_client; pub enum PeerValidationResult { Valid, @@ -13,13 +17,29 @@ pub enum PeerValidationResult { } pub struct FlowGrpcClient { - client: peerdb_route::flow_service_client::FlowServiceClient, + client: peerdb_route::flow_service_client::FlowServiceClient< + tonic::codegen::InterceptedService, + >, health_client: health_client::HealthClient, } +struct BearerAuthInterceptor { + token: String, +} + +impl tonic::service::Interceptor for BearerAuthInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { + request.metadata_mut().insert( + "authorization", + tonic::metadata::MetadataValue::from_str(format!("Bearer {0}", self.token).as_str()).unwrap(), + ); + Ok(request) + } +} + impl FlowGrpcClient { // create a new grpc client to the flow server using flow server address - pub async fn new(flow_server_addr: &str) -> anyhow::Result { + pub async fn new(flow_server_addr: &str, password: String) -> anyhow::Result { // change protocol to grpc let flow_server_addr = flow_server_addr.replace("http", "grpc"); @@ -30,12 +50,15 @@ impl FlowGrpcClient { // Create a gRPC channel let channel = tonic::transport::Channel::from_shared(grpc_endpoint.clone())?.connect_lazy(); - // construct a grpc client to the flow server - let client = peerdb_route::flow_service_client::FlowServiceClient::new(channel.clone()); + // encode the password to base64, to send in all requests + let token = base64::prelude::BASE64_STANDARD.encode(password.as_bytes()); - // construct a health client to the flow server, use the grpc endpoint - let health_client = health_client::HealthClient::new(channel); + // use the token in all requests + let interceptor = BearerAuthInterceptor { token }; + // construct a grpc client to the flow server with an interceptor + let client= peerdb_route::flow_service_client::FlowServiceClient::with_interceptor(channel.clone(), interceptor); + let health_client = health_client::HealthClient::new(channel); Ok(Self { client, health_client, @@ -267,9 +290,9 @@ impl FlowGrpcClient { } if !cfg.initial_copy_only { if let Some(QRepWriteMode { - write_type: wt, - upsert_key_columns: _, - }) = cfg.write_mode + write_type: wt, + upsert_key_columns: _, + }) = cfg.write_mode { if wt == QRepWriteType::QrepWriteModeOverwrite as i32 { return anyhow::Result::Err(anyhow::anyhow!( diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 480c6d9e0d..cd0943b41f 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1214,7 +1214,10 @@ pub async fn main() -> anyhow::Result<()> { // log that we accept mirror commands if we have a flow server let flow_handler = if let Some(ref addr) = args.flow_api_url { tracing::info!("MIRROR commands enabled"); - Some(Arc::new(Mutex::new(FlowGrpcClient::new(addr).await?))) + let password = args.peerdb_password.clone(); + Some(Arc::new(Mutex::new( + FlowGrpcClient::new(addr, password).await?, + ))) } else { tracing::info!("MIRROR commands disabled"); None diff --git a/protos/flow.proto b/protos/flow.proto index c7f8e4fa25..0545ee114f 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -111,6 +111,7 @@ message SyncFlowOptions { map src_table_id_name_mapping = 4; map table_name_schema_mapping = 5; repeated TableMapping table_mappings = 6; + int32 number_of_syncs = 7; } message StartNormalizeInput { @@ -371,6 +372,7 @@ message CDCFlowConfigUpdate { repeated TableMapping additional_tables = 1; uint32 batch_size = 2; uint64 idle_timeout = 3; + int32 number_of_syncs = 4; } message QRepFlowConfigUpdate { diff --git a/protos/route.proto b/protos/route.proto index 61a3b89b75..6e84341399 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -25,6 +25,18 @@ message CreateQRepFlowResponse { string workflow_id = 1; } +message CreateCustomSyncRequest { + string flow_job_name = 1; + int32 number_of_syncs = 2; +} + +message CreateCustomSyncResponse { + string flow_job_name = 1; + int32 number_of_syncs = 2; + string error_message = 3; + bool ok = 4; +} + message ShutdownRequest { string workflow_id = 1; string flow_job_name = 2; @@ -79,6 +91,7 @@ message CreatePeerResponse { message MirrorStatusRequest { string flow_job_name = 1; + bool include_flow_info = 2; } message PartitionStatus { @@ -203,6 +216,7 @@ message MirrorStatusResponse { } string error_message = 4; peerdb_flow.FlowStatus current_flow_state = 5; + bool ok = 6; } message ValidateCDCMirrorResponse{ @@ -237,7 +251,7 @@ service FlowService { body: "*" }; } - rpc ValidateCDCMirror(CreateCDCFlowRequest) returns (ValidateCDCMirrorResponse) { + rpc ValidateCDCMirror(CreateCDCFlowRequest) returns (ValidateCDCMirrorResponse) { option (google.api.http) = { post: "/v1/mirrors/cdc/validate", body: "*" @@ -255,6 +269,7 @@ service FlowService { body: "*" }; } + rpc CreateCDCFlow(CreateCDCFlowRequest) returns (CreateCDCFlowResponse) { option (google.api.http) = { post: "/v1/flows/cdc/create", @@ -267,6 +282,12 @@ service FlowService { body: "*" }; } + rpc CustomSyncFlow(CreateCustomSyncRequest) returns (CreateCustomSyncResponse) { + option (google.api.http) = { + post: "/v1/flows/cdc/sync", + body: "*" + }; + } rpc GetSchemas(PostgresPeerActivityInfoRequest) returns (PeerSchemasResponse) { option (google.api.http) = { get: "/v1/peers/schemas" }; @@ -294,6 +315,7 @@ service FlowService { rpc GetStatInfo(PostgresPeerActivityInfoRequest) returns (PeerStatResponse) { option (google.api.http) = { get: "/v1/peers/stats/{peer_name}" }; } + rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) { option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" }; } diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index c7c7bde379..dfdcda2ade 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -1,29 +1,23 @@ import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO'; +import { CreateCDCFlowRequest } from '@/grpc_generated/route'; import { - CreateCDCFlowRequest, - CreateCDCFlowResponse, -} from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { config } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); const req: CreateCDCFlowRequest = { connectionConfigs: config, }; try { - const createStatus: CreateCDCFlowResponse = await fetch( - `${flowServiceAddr}/v1/flows/cdc/create`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); - + const createStatus = await flowServiceClient.post( + `/v1/flows/cdc/create`, + req + ); if (!createStatus.workflowId) { return new Response(JSON.stringify(createStatus)); } @@ -33,6 +27,7 @@ export async function POST(request: Request) { return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.log(message, e); } } diff --git a/ui/app/api/mirrors/cdc/validate/route.ts b/ui/app/api/mirrors/cdc/validate/route.ts index 8354dbef88..1d9ed31a0a 100644 --- a/ui/app/api/mirrors/cdc/validate/route.ts +++ b/ui/app/api/mirrors/cdc/validate/route.ts @@ -2,29 +2,25 @@ import { CreateCDCFlowRequest, ValidateCDCMirrorResponse, } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; import { NextRequest } from 'next/server'; export async function POST(request: NextRequest) { const body = await request.json(); const { config } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); const req: CreateCDCFlowRequest = { connectionConfigs: config, }; try { - const validateResponse: ValidateCDCMirrorResponse = await fetch( - `${flowServiceAddr}/v1/mirrors/cdc/validate`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); - + const validateResponse: ValidateCDCMirrorResponse = + await flowServiceClient.post(`/v1/mirrors/cdc/validate`, req); return new Response(JSON.stringify(validateResponse)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.log(message, e); } } diff --git a/ui/app/api/mirrors/drop/route.ts b/ui/app/api/mirrors/drop/route.ts index 205ab26fef..948b042eb0 100644 --- a/ui/app/api/mirrors/drop/route.ts +++ b/ui/app/api/mirrors/drop/route.ts @@ -1,11 +1,14 @@ import { UDropMirrorResponse } from '@/app/dto/MirrorsDTO'; import { ShutdownRequest, ShutdownResponse } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { workflowId, flowJobName, sourcePeer, destinationPeer } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); const req: ShutdownRequest = { workflowId, flowJobName, @@ -15,15 +18,10 @@ export async function POST(request: Request) { }; try { - const dropStatus: ShutdownResponse = await fetch( - `${flowServiceAddr}/v1/mirrors/drop`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); + const dropStatus: ShutdownResponse = await flowServiceClient.post( + `/v1/mirrors/drop`, + req + ); let response: UDropMirrorResponse = { dropped: dropStatus.ok, errorMessage: dropStatus.errorMessage, @@ -31,6 +29,7 @@ export async function POST(request: Request) { return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.error(message, e); } } diff --git a/ui/app/api/mirrors/qrep/route.ts b/ui/app/api/mirrors/qrep/route.ts index 7febf60140..6ded5afe38 100644 --- a/ui/app/api/mirrors/qrep/route.ts +++ b/ui/app/api/mirrors/qrep/route.ts @@ -3,33 +3,32 @@ import { CreateQRepFlowRequest, CreateQRepFlowResponse, } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { config } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); const req: CreateQRepFlowRequest = { qrepConfig: config, createCatalogEntry: true, }; try { - const createStatus: CreateQRepFlowResponse = await fetch( - `${flowServiceAddr}/v1/flows/qrep/create`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); + const createStatus: CreateQRepFlowResponse = await flowServiceClient.post( + `/v1/flows/qrep/create`, + req + ); let response: UCreateMirrorResponse = { created: !!createStatus.workflowId, }; return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.error(message, e); } } diff --git a/ui/app/api/mirrors/state/route.ts b/ui/app/api/mirrors/state/route.ts index 5cb63ccfea..3eaf305fab 100644 --- a/ui/app/api/mirrors/state/route.ts +++ b/ui/app/api/mirrors/state/route.ts @@ -1,22 +1,11 @@ -import { - MirrorStatusRequest, - MirrorStatusResponse, -} from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { MirrorStatusRequest } from '@/grpc_generated/route'; +import { GetFlowServiceHttpClient } from '@/rpc/http'; export async function POST(request: Request) { const body: MirrorStatusRequest = await request.json(); - const flowServiceAddr = GetFlowHttpAddressFromEnv(); - try { - const res: MirrorStatusResponse = await fetch( - `${flowServiceAddr}/v1/mirrors/${body.flowJobName}`, - { cache: 'no-store' } - ).then((res) => { - return res.json(); - }); - - return new Response(JSON.stringify(res)); - } catch (e) { - console.error(e); - } + const flowServiceClient = GetFlowServiceHttpClient(); + return flowServiceClient.raw( + `/v1/mirrors/${body.flowJobName}?include_flow_info=true`, + { cache: 'no-store' } + ); } diff --git a/ui/app/api/mirrors/state_change/route.ts b/ui/app/api/mirrors/state_change/route.ts index 0d81b75e7d..c3c3ff2001 100644 --- a/ui/app/api/mirrors/state_change/route.ts +++ b/ui/app/api/mirrors/state_change/route.ts @@ -1,23 +1,20 @@ import { FlowStateChangeResponse } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); - const flowServiceAddr = GetFlowHttpAddressFromEnv(); - + const flowServiceClient = GetFlowServiceHttpClient(); try { - const res: FlowStateChangeResponse = await fetch( - `${flowServiceAddr}/v1/mirrors/state_change`, - { - method: 'POST', - body: JSON.stringify(body), - } - ).then((res) => { - return res.json(); - }); - + const res: FlowStateChangeResponse = await flowServiceClient.post( + `/v1/mirrors/state_change`, + body + ); return new Response(JSON.stringify(res)); } catch (e) { - console.error(e); + const message = await ParseFlowServiceErrorMessage(e); + console.error(message, e); } } diff --git a/ui/app/api/peers/columns/route.ts b/ui/app/api/peers/columns/route.ts index 9f45db092e..4417f9f2a2 100644 --- a/ui/app/api/peers/columns/route.ts +++ b/ui/app/api/peers/columns/route.ts @@ -1,22 +1,24 @@ import { UColumnsResponse } from '@/app/dto/PeersDTO'; import { TableColumnsResponse } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { peerName, schemaName, tableName } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); try { - const columnsList: TableColumnsResponse = await fetch( - `${flowServiceAddr}/v1/peers/columns?peer_name=${peerName}&schema_name=${schemaName}&table_name=${tableName}` - ).then((res) => { - return res.json(); - }); + const columnsList: TableColumnsResponse = await flowServiceClient.get( + `/v1/peers/columns?peer_name=${peerName}&schema_name=${schemaName}&table_name=${tableName}` + ); let response: UColumnsResponse = { columns: columnsList.columns, }; return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.log(message, e); } } diff --git a/ui/app/api/peers/drop/route.ts b/ui/app/api/peers/drop/route.ts index 7449480033..2bc462f16d 100644 --- a/ui/app/api/peers/drop/route.ts +++ b/ui/app/api/peers/drop/route.ts @@ -1,25 +1,20 @@ import { UDropPeerResponse } from '@/app/dto/PeersDTO'; -import { DropPeerRequest, DropPeerResponse } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { DropPeerRequest } from '@/grpc_generated/route'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { peerName } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); const req: DropPeerRequest = { peerName, }; console.log('/drop/peer: req:', req); try { - const dropStatus: DropPeerResponse = await fetch( - `${flowServiceAddr}/v1/peers/drop`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); + const dropStatus = await flowServiceClient.post(`/v1/peers/drop`, req); let response: UDropPeerResponse = { dropped: dropStatus.ok, errorMessage: dropStatus.errorMessage, @@ -27,6 +22,7 @@ export async function POST(request: Request) { return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.error(message, e); } } diff --git a/ui/app/api/peers/publications/route.ts b/ui/app/api/peers/publications/route.ts index 64dcd3dcf6..e381796d5a 100644 --- a/ui/app/api/peers/publications/route.ts +++ b/ui/app/api/peers/publications/route.ts @@ -1,22 +1,26 @@ import { UPublicationsResponse } from '@/app/dto/PeersDTO'; import { PeerPublicationsResponse } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { peerName } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); try { - const publicationList: PeerPublicationsResponse = await fetch( - `${flowServiceAddr}/v1/peers/publications?peer_name=${peerName}` - ).then((res) => { - return res.json(); - }); + const publicationList: PeerPublicationsResponse = + await flowServiceClient.get( + `/v1/peers/publications?peer_name=${peerName}` + ); let response: UPublicationsResponse = { publicationNames: publicationList.publicationNames, }; + console.log(response); return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.log(message, e); } } diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index 4f468d8fb2..a503882ba0 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -21,7 +21,6 @@ import { } from '@/grpc_generated/peers'; import { CreatePeerRequest, - CreatePeerResponse, CreatePeerStatus, ValidatePeerRequest, ValidatePeerResponse, @@ -29,7 +28,10 @@ import { createPeerStatusFromJSON, validatePeerStatusFromJSON, } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; const constructPeer = ( name: string, @@ -101,20 +103,16 @@ export const dynamic = 'force-dynamic'; export async function POST(request: Request) { const body = await request.json(); const { name, type, config, mode } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); const peer = constructPeer(name, type, config); if (mode === 'validate') { const validateReq: ValidatePeerRequest = { peer }; try { - const validateStatus: ValidatePeerResponse = await fetch( - `${flowServiceAddr}/v1/peers/validate`, - { - method: 'POST', - body: JSON.stringify(validateReq), - } - ).then((res) => { - return res.json(); - }); + const validateStatus: ValidatePeerResponse = await flowServiceClient + .post(`/v1/peers/validate`, validateReq) + .then((res) => { + return res.data; + }); let response: UValidatePeerResponse = { valid: validatePeerStatusFromJSON(validateStatus.status) === @@ -123,20 +121,16 @@ export async function POST(request: Request) { }; return new Response(JSON.stringify(response)); } catch (error) { - console.error('Error validating peer:', error); + const message = await ParseFlowServiceErrorMessage(error); + console.error('Error validating peer:', message, error); } } else if (mode === 'create') { const req: CreatePeerRequest = { peer }; try { - const createStatus: CreatePeerResponse = await fetch( - `${flowServiceAddr}/v1/peers/create`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); + const createStatus = await flowServiceClient.post( + `/v1/peers/create`, + req + ); let response: UCreatePeerResponse = { created: createPeerStatusFromJSON(createStatus.status) === @@ -145,7 +139,8 @@ export async function POST(request: Request) { }; return new Response(JSON.stringify(response)); } catch (error) { - console.error('Error creating peer:', error); + const message = await ParseFlowServiceErrorMessage(error); + console.error('Error creating peer:', message, error); } } } diff --git a/ui/app/api/peers/schemas/route.ts b/ui/app/api/peers/schemas/route.ts index 9a1c10856e..4d2b6aa8a8 100644 --- a/ui/app/api/peers/schemas/route.ts +++ b/ui/app/api/peers/schemas/route.ts @@ -1,21 +1,23 @@ import { USchemasResponse } from '@/app/dto/PeersDTO'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { peerName } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); try { - const schemaList = await fetch( - `${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}` - ).then((res) => { - return res.json(); - }); + const schemaList = await flowServiceClient.get( + `/v1/peers/schemas?peer_name=${peerName}` + ); let response: USchemasResponse = { schemas: schemaList.schemas, }; return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.log(message, e); } } diff --git a/ui/app/api/peers/tables/all/route.ts b/ui/app/api/peers/tables/all/route.ts index 0281cc7067..021b5f4799 100644 --- a/ui/app/api/peers/tables/all/route.ts +++ b/ui/app/api/peers/tables/all/route.ts @@ -1,22 +1,24 @@ import { UTablesAllResponse } from '@/app/dto/PeersDTO'; import { AllTablesResponse } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { peerName } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); try { - const tableList: AllTablesResponse = await fetch( - `${flowServiceAddr}/v1/peers/tables/all?peer_name=${peerName}` - ).then((res) => { - return res.json(); - }); + const tableList: AllTablesResponse = await flowServiceClient.get( + `/v1/peers/tables/all?peer_name=${peerName}` + ); let response: UTablesAllResponse = { tables: tableList.tables, }; return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.error(message, e); } } diff --git a/ui/app/api/peers/tables/route.ts b/ui/app/api/peers/tables/route.ts index b4c73500dd..b8a3c3f842 100644 --- a/ui/app/api/peers/tables/route.ts +++ b/ui/app/api/peers/tables/route.ts @@ -1,22 +1,24 @@ import { UTablesResponse } from '@/app/dto/PeersDTO'; import { SchemaTablesResponse } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { peerName, schemaName } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); try { - const tableList: SchemaTablesResponse = await fetch( - `${flowServiceAddr}/v1/peers/tables?peer_name=${peerName}&schema_name=${schemaName}` - ).then((res) => { - return res.json(); - }); + const tableList: SchemaTablesResponse = await flowServiceClient.get( + `/v1/peers/tables?peer_name=${peerName}&schema_name=${schemaName}` + ); let response: UTablesResponse = { tables: tableList.tables, }; return new Response(JSON.stringify(response)); } catch (e) { - console.log(e); + const message = await ParseFlowServiceErrorMessage(e); + console.log(message, e); } } diff --git a/ui/app/api/version/route.ts b/ui/app/api/version/route.ts index 1197ca2049..7371ba4269 100644 --- a/ui/app/api/version/route.ts +++ b/ui/app/api/version/route.ts @@ -1,23 +1,25 @@ -import { UVersionResponse } from '@/app/dto/VersionDTO'; -import { PeerDBVersionResponse } from '@/grpc_generated/route'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; export const dynamic = 'force-dynamic'; export async function GET() { - const flowServiceAddr = GetFlowHttpAddressFromEnv(); + const flowServiceClient = GetFlowServiceHttpClient(); try { - const versionResponse: PeerDBVersionResponse = await fetch( - `${flowServiceAddr}/v1/version` - ).then((res) => { - return res.json(); - }); - let response: UVersionResponse = { + const versionResponse = await flowServiceClient.get(`/v1/version`); + let response = { version: versionResponse.version, }; return new Response(JSON.stringify(response)); } catch (error) { - console.error('Error getting version:', error); - return new Response(JSON.stringify({ error: error })); + if (error instanceof Response) { + return error; + } + + const message = await ParseFlowServiceErrorMessage(error); + console.error('Error getting version:', message); + return new Response(JSON.stringify({ error: message }), { status: 500 }); } } diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index 9c738fb712..3a2f1df7e1 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -38,6 +38,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { batchSize: defaultBatchSize, idleTimeout: defaultIdleTimeout, additionalTables: [], + numberOfSyncs: 0, }); const { push } = useRouter(); @@ -53,6 +54,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { (res as MirrorStatusResponse).cdcStatus?.config?.idleTimeoutSeconds || defaultIdleTimeout, additionalTables: [], + numberOfSyncs: 0, }); }); }, [mirrorId, defaultBatchSize, defaultIdleTimeout]); diff --git a/ui/app/mirrors/[mirrorId]/page.tsx b/ui/app/mirrors/[mirrorId]/page.tsx index d7a78f9187..14f0dc1980 100644 --- a/ui/app/mirrors/[mirrorId]/page.tsx +++ b/ui/app/mirrors/[mirrorId]/page.tsx @@ -3,10 +3,12 @@ import prisma from '@/app/utils/prisma'; import MirrorActions from '@/components/MirrorActionsDropdown'; import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow'; import { DBType } from '@/grpc_generated/peers'; -import { MirrorStatusResponse } from '@/grpc_generated/route'; import { Header } from '@/lib/Header'; import { LayoutMain } from '@/lib/Layout'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; import { redirect } from 'next/navigation'; import { CDCMirror } from './cdc'; import NoMirror from './nomirror'; @@ -17,21 +19,24 @@ type EditMirrorProps = { }; function getMirrorStatusUrl(mirrorId: string) { - let base = GetFlowHttpAddressFromEnv(); - return `${base}/v1/mirrors/${mirrorId}`; + return `/v1/mirrors/${mirrorId}?include_flow_info=true`; } async function getMirrorStatus(mirrorId: string) { const url = getMirrorStatusUrl(mirrorId); - const resp = await fetch(url, { cache: 'no-store' }); - const json = await resp.json(); - return json; + const flowServiceClient = GetFlowServiceHttpClient(); + try { + return await flowServiceClient.get(url, { cache: 'no-store' }); + } catch (e) { + const message = await ParseFlowServiceErrorMessage(e); + console.error(message, e); + } } export default async function ViewMirror({ params: { mirrorId }, }: EditMirrorProps) { - const mirrorStatus: MirrorStatusResponse = await getMirrorStatus(mirrorId); + const mirrorStatus = await getMirrorStatus(mirrorId); if (!mirrorStatus) { return
No mirror status found!
; } diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 72ee1d5486..33ced06cb3 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -445,6 +445,7 @@ export const handleValidateCDC = async ( }; export const fetchPublications = async (peerName: string) => { + if (!peerName || peerName.length === 0) return []; const publicationsRes: UPublicationsResponse = await fetch( '/api/peers/publications', { diff --git a/ui/app/peers/[peerName]/page.tsx b/ui/app/peers/[peerName]/page.tsx index 5d5742c8cd..d267c75631 100644 --- a/ui/app/peers/[peerName]/page.tsx +++ b/ui/app/peers/[peerName]/page.tsx @@ -2,7 +2,10 @@ import { PeerInfo } from '@/components/PeerInfo'; import ReloadButton from '@/components/ReloadButton'; import { PeerSlotResponse, PeerStatResponse } from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; -import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; +import { + GetFlowServiceHttpClient, + ParseFlowServiceErrorMessage, +} from '@/rpc/http'; import LagGraph from './lagGraph'; import SlotTable from './slottable'; import StatTable from './stattable'; @@ -13,44 +16,50 @@ type DataConfigProps = { const PeerData = async ({ params: { peerName } }: DataConfigProps) => { const getSlotData = async () => { - const flowServiceAddr = GetFlowHttpAddressFromEnv(); - - const peerSlots: PeerSlotResponse = await fetch( - `${flowServiceAddr}/v1/peers/slots/${peerName}`, - { - cache: 'no-store', - } - ).then((res) => res.json()); - - const slotArray = peerSlots.slotData; - // slots with 'peerflow_slot' should come first - slotArray?.sort((slotA, slotB) => { - if ( - slotA.slotName.startsWith('peerflow_slot') && - !slotB.slotName.startsWith('peerflow_slot') - ) { - return -1; - } else if ( - !slotA.slotName.startsWith('peerflow_slot') && - slotB.slotName.startsWith('peerflow_slot') - ) { - return 1; - } else { - return 0; - } - }); - return slotArray; + const flowServiceClient = GetFlowServiceHttpClient(); + try { + const peerSlots: PeerSlotResponse = await flowServiceClient.get( + `/v1/peers/slots/${peerName}`, + { cache: 'no-store' } + ); + const slotArray = peerSlots.slotData; + // slots with 'peerflow_slot' should come first + slotArray?.sort((slotA, slotB) => { + if ( + slotA.slotName.startsWith('peerflow_slot') && + !slotB.slotName.startsWith('peerflow_slot') + ) { + return -1; + } else if ( + !slotA.slotName.startsWith('peerflow_slot') && + slotB.slotName.startsWith('peerflow_slot') + ) { + return 1; + } else { + return 0; + } + }); + return slotArray; + } catch (e) { + const message = await ParseFlowServiceErrorMessage(e); + console.error(message, e); + return []; + } }; const getStatData = async () => { - const flowServiceAddr = GetFlowHttpAddressFromEnv(); - - const peerStats: PeerStatResponse = await fetch( - `${flowServiceAddr}/v1/peers/stats/${peerName}`, - { cache: 'no-store' } - ).then((res) => res.json()); - - return peerStats.statData; + const flowServiceClient = GetFlowServiceHttpClient(); + try { + const peerStats: PeerStatResponse = await flowServiceClient.get( + `/v1/peers/stats/${peerName}`, + { cache: 'no-store' } + ); + return peerStats.statData; + } catch (e) { + const message = await ParseFlowServiceErrorMessage(e); + console.error(message, e); + return []; + } }; const slots = await getSlotData(); diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index 6ba3e68893..49b8f88e44 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -4,20 +4,19 @@ import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { LayoutMain } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; +import { ProgressCircle } from '@/lib/ProgressCircle'; import Link from 'next/link'; import { Header } from '../../lib/Header'; import PeersTable from './peersTable'; -export const dynamic = 'force-dynamic'; - -import { ProgressCircle } from '@/lib/ProgressCircle'; import NewButton from '@/components/NewButton'; import useSWR from 'swr'; import { fetcher } from '../utils/swr'; +export const dynamic = 'force-dynamic'; + export default function Peers() { const { data: peers, error, isLoading } = useSWR('/api/peers', fetcher); - return ( diff --git a/ui/components/SidebarComponent.tsx b/ui/components/SidebarComponent.tsx index 5be2e128f6..48feba6c79 100644 --- a/ui/components/SidebarComponent.tsx +++ b/ui/components/SidebarComponent.tsx @@ -1,7 +1,6 @@ 'use client'; import { UVersionResponse } from '@/app/dto/VersionDTO'; -import { fetcher } from '@/app/utils/swr'; import Logout from '@/components/Logout'; import { BrandLogo } from '@/lib/BrandLogo'; import { Button } from '@/lib/Button'; @@ -29,9 +28,16 @@ export default function SidebarComponent() { const { data: version, isLoading, + error, }: { data: UVersionResponse; error: any; isLoading: boolean } = useSWR( '/api/version', - fetcher + async (url: string) => { + const res = await fetch(url); + if (!res.ok) { + throw new Error(); + } + return res.json(); + } ); const [sidebarState, setSidebarState] = useLocalStorage( @@ -114,7 +120,11 @@ export default function SidebarComponent() { ) : ( diff --git a/ui/rpc/http.ts b/ui/rpc/http.ts index c0d9b75542..786258f5c8 100644 --- a/ui/rpc/http.ts +++ b/ui/rpc/http.ts @@ -1,5 +1,65 @@ -import 'server-only'; +import { GetAuthorizationHeader } from '@/rpc/token'; export function GetFlowHttpAddressFromEnv() { return process.env.PEERDB_FLOW_SERVER_HTTP!; } + +function handleResponse(res: Response) { + if (!res.ok) { + throw res; + } + return res.json(); +} + +class Client { + baseUrl: string; + headers: Headers; + + constructor(baseUrl: string, headers: Headers) { + this.baseUrl = baseUrl; + this.headers = headers; + } + + raw(path: string, options?: { [key: string]: any }) { + return fetch(this.baseUrl + path, { headers: this.headers, ...options }); + } + + async get(path: string, options?: { [key: string]: any }) { + const res = await this.raw(path, options); + return handleResponse(res); + } + + async post(path: string, options?: { [key: string]: any }) { + const res = await this.raw(path, { + method: 'POST', + ...options, + }); + return handleResponse(res); + } +} + +const flowServiceHeaders = new Headers({ + 'Content-Type': 'application/json', +}); +if (GetAuthorizationHeader()) { + flowServiceHeaders.set('Authorization', GetAuthorizationHeader()); +} +const flowServiceHttpClient = new Client( + GetFlowHttpAddressFromEnv(), + flowServiceHeaders +); + +export function GetFlowServiceHttpClient() { + return flowServiceHttpClient; +} + +export async function ParseFlowServiceErrorMessage(error: any) { + if (error instanceof Response) { + const text = await error.text(); + return `${error.status} ${error.statusText} ${text}`; + } else if (error instanceof Error) { + return error.message; + } else { + return error; + } +} diff --git a/ui/rpc/token.ts b/ui/rpc/token.ts new file mode 100644 index 0000000000..85e22bd8fe --- /dev/null +++ b/ui/rpc/token.ts @@ -0,0 +1,9 @@ +import 'server-only'; + +export function GetAuthorizationHeader() { + const password = process.env.PEERDB_PASSWORD; + if (!password) { + return ''; + } + return `Bearer ${Buffer.from(password).toString('base64')}`; +}