diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index d71f1e49a7..586c9811c3 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "log/slog" "strconv" "strings" "time" @@ -237,6 +238,8 @@ func (h *FlowRequestHandler) CreateQRepFlow( if req.CreateCatalogEntry { err := h.createQrepJobEntry(ctx, req, workflowID) if err != nil { + slog.Error("unable to create flow job entry", + slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) return nil, fmt.Errorf("unable to create flow job entry: %w", err) } } @@ -251,6 +254,8 @@ func (h *FlowRequestHandler) CreateQRepFlow( // hack to facilitate migrating from existing xmin sync txid, err := strconv.ParseInt(postColon, 10, 64) if err != nil { + slog.Error("invalid xmin txid for xmin rep", + slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err) } state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}} @@ -262,11 +267,15 @@ func (h *FlowRequestHandler) CreateQRepFlow( } _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { + slog.Error("unable to start QRepFlow workflow", + slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err) } err = h.updateQRepConfigInCatalog(cfg) if err != nil { + slog.Error("unable to update qrep config in catalog", + slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) return nil, fmt.Errorf("unable to update qrep config in catalog: %w", err) } @@ -301,6 +310,10 @@ func (h *FlowRequestHandler) ShutdownFlow( ctx context.Context, req *protos.ShutdownRequest, ) (*protos.ShutdownResponse, error) { + logs := slog.Group("shutdown-log", + slog.String("flowName", req.FlowJobName), + slog.String("workflowId", req.WorkflowId), + ) err := h.temporalClient.SignalWorkflow( ctx, req.WorkflowId, @@ -309,6 +322,10 @@ func (h *FlowRequestHandler) ShutdownFlow( shared.ShutdownSignal, ) if err != nil { + slog.Error("unable to signal PeerFlow workflow", + logs, + slog.Any("error", err), + ) return &protos.ShutdownResponse{ Ok: false, ErrorMessage: fmt.Sprintf("unable to signal PeerFlow workflow: %v", err), @@ -317,6 +334,10 @@ func (h *FlowRequestHandler) ShutdownFlow( err = h.waitForWorkflowClose(ctx, req.WorkflowId) if err != nil { + slog.Error("unable to wait for PeerFlow workflow to close", + logs, + slog.Any("error", err), + ) return &protos.ShutdownResponse{ Ok: false, ErrorMessage: fmt.Sprintf("unable to wait for PeerFlow workflow to close: %v", err), @@ -338,6 +359,9 @@ func (h *FlowRequestHandler) ShutdownFlow( req, // workflow input ) if err != nil { + slog.Error("unable to start DropFlow workflow", + logs, + slog.Any("error", err)) return &protos.ShutdownResponse{ Ok: false, ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err), @@ -355,6 +379,10 @@ func (h *FlowRequestHandler) ShutdownFlow( select { case err := <-errChan: if err != nil { + slog.Error("DropFlow workflow did not execute successfully", + logs, + slog.Any("error", err), + ) return &protos.ShutdownResponse{ Ok: false, ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err), @@ -363,6 +391,10 @@ func (h *FlowRequestHandler) ShutdownFlow( case <-time.After(1 * time.Minute): err := h.handleWorkflowNotClosed(ctx, workflowID, "") if err != nil { + slog.Error("unable to wait for DropFlow workflow to close", + logs, + slog.Any("error", err), + ) return &protos.ShutdownResponse{ Ok: false, ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err), @@ -373,6 +405,10 @@ func (h *FlowRequestHandler) ShutdownFlow( if req.RemoveFlowEntry { delErr := h.removeFlowEntryInCatalog(req.FlowJobName) if delErr != nil { + slog.Error("unable to remove flow job entry", + slog.String("flowName", req.FlowJobName), + slog.Any("error", err), + slog.String("workflowId", req.WorkflowId)) return &protos.ShutdownResponse{ Ok: false, ErrorMessage: err.Error(), diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index 025dfc149a..12efa4a826 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -14,19 +14,23 @@ export async function POST(request: Request) { connectionConfigs: config, createCatalogEntry: true, }; - const createStatus: CreateCDCFlowResponse = await fetch( - `${flowServiceAddr}/v1/flows/cdc/create`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); + try { + const createStatus: CreateCDCFlowResponse = await fetch( + `${flowServiceAddr}/v1/flows/cdc/create`, + { + method: 'POST', + body: JSON.stringify(req), + } + ).then((res) => { + return res.json(); + }); - let response: UCreateMirrorResponse = { - created: !!createStatus.worflowId, - }; + let response: UCreateMirrorResponse = { + created: !!createStatus.worflowId, + }; - return new Response(JSON.stringify(response)); + return new Response(JSON.stringify(response)); + } catch (e) { + console.log(e); + } } diff --git a/ui/app/api/mirrors/drop/route.ts b/ui/app/api/mirrors/drop/route.ts index 6652bf5897..e3be0f7c41 100644 --- a/ui/app/api/mirrors/drop/route.ts +++ b/ui/app/api/mirrors/drop/route.ts @@ -14,19 +14,23 @@ export async function POST(request: Request) { removeFlowEntry: true, }; console.log('/drop/mirror: req:', req); - const dropStatus: ShutdownResponse = await fetch( - `${flowServiceAddr}/v1/mirrors/drop`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); - let response: UDropMirrorResponse = { - dropped: dropStatus.ok, - errorMessage: dropStatus.errorMessage, - }; + try { + const dropStatus: ShutdownResponse = await fetch( + `${flowServiceAddr}/v1/mirrors/drop`, + { + method: 'POST', + body: JSON.stringify(req), + } + ).then((res) => { + return res.json(); + }); + let response: UDropMirrorResponse = { + dropped: dropStatus.ok, + errorMessage: dropStatus.errorMessage, + }; - return new Response(JSON.stringify(response)); + return new Response(JSON.stringify(response)); + } catch (e) { + console.log(e); + } } diff --git a/ui/app/api/mirrors/qrep/route.ts b/ui/app/api/mirrors/qrep/route.ts index ff160e703e..6f199dc53f 100644 --- a/ui/app/api/mirrors/qrep/route.ts +++ b/ui/app/api/mirrors/qrep/route.ts @@ -14,18 +14,22 @@ export async function POST(request: Request) { qrepConfig: config, createCatalogEntry: true, }; - const createStatus: CreateQRepFlowResponse = await fetch( - `${flowServiceAddr}/v1/flows/qrep/create`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); - let response: UCreateMirrorResponse = { - created: !!createStatus.worflowId, - }; + try { + const createStatus: CreateQRepFlowResponse = await fetch( + `${flowServiceAddr}/v1/flows/qrep/create`, + { + method: 'POST', + body: JSON.stringify(req), + } + ).then((res) => { + return res.json(); + }); + let response: UCreateMirrorResponse = { + created: !!createStatus.worflowId, + }; - return new Response(JSON.stringify(response)); + return new Response(JSON.stringify(response)); + } catch (e) { + console.log(e); + } } diff --git a/ui/app/api/peers/columns/route.ts b/ui/app/api/peers/columns/route.ts index ce944907fb..9f45db092e 100644 --- a/ui/app/api/peers/columns/route.ts +++ b/ui/app/api/peers/columns/route.ts @@ -6,13 +6,17 @@ export async function POST(request: Request) { const body = await request.json(); const { peerName, schemaName, tableName } = body; const flowServiceAddr = GetFlowHttpAddressFromEnv(); - const columnsList: TableColumnsResponse = await fetch( - `${flowServiceAddr}/v1/peers/columns?peer_name=${peerName}&schema_name=${schemaName}&table_name=${tableName}` - ).then((res) => { - return res.json(); - }); - let response: UColumnsResponse = { - columns: columnsList.columns, - }; - return new Response(JSON.stringify(response)); + try { + const columnsList: TableColumnsResponse = await fetch( + `${flowServiceAddr}/v1/peers/columns?peer_name=${peerName}&schema_name=${schemaName}&table_name=${tableName}` + ).then((res) => { + return res.json(); + }); + let response: UColumnsResponse = { + columns: columnsList.columns, + }; + return new Response(JSON.stringify(response)); + } catch (e) { + console.log(e); + } } diff --git a/ui/app/api/peers/drop/route.ts b/ui/app/api/peers/drop/route.ts index 0a7eb25418..7449480033 100644 --- a/ui/app/api/peers/drop/route.ts +++ b/ui/app/api/peers/drop/route.ts @@ -10,19 +10,23 @@ export async function POST(request: Request) { peerName, }; console.log('/drop/peer: req:', req); - const dropStatus: DropPeerResponse = await fetch( - `${flowServiceAddr}/v1/peers/drop`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); - let response: UDropPeerResponse = { - dropped: dropStatus.ok, - errorMessage: dropStatus.errorMessage, - }; + try { + const dropStatus: DropPeerResponse = await fetch( + `${flowServiceAddr}/v1/peers/drop`, + { + method: 'POST', + body: JSON.stringify(req), + } + ).then((res) => { + return res.json(); + }); + let response: UDropPeerResponse = { + dropped: dropStatus.ok, + errorMessage: dropStatus.errorMessage, + }; - return new Response(JSON.stringify(response)); + return new Response(JSON.stringify(response)); + } catch (e) { + console.log(e); + } } diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index 8580014956..c865979efe 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -71,41 +71,49 @@ export async function POST(request: Request) { const peer = constructPeer(name, type, config); if (mode === 'validate') { const validateReq: ValidatePeerRequest = { peer }; - const validateStatus: ValidatePeerResponse = await fetch( - `${flowServiceAddr}/v1/peers/validate`, - { - method: 'POST', - body: JSON.stringify(validateReq), - } - ).then((res) => { - return res.json(); - }); - let response: UValidatePeerResponse = { - valid: - validatePeerStatusFromJSON(validateStatus.status) === - ValidatePeerStatus.VALID, - message: validateStatus.message, - }; - return new Response(JSON.stringify(response)); + try { + const validateStatus: ValidatePeerResponse = await fetch( + `${flowServiceAddr}/v1/peers/validate`, + { + method: 'POST', + body: JSON.stringify(validateReq), + } + ).then((res) => { + return res.json(); + }); + let response: UValidatePeerResponse = { + valid: + validatePeerStatusFromJSON(validateStatus.status) === + ValidatePeerStatus.VALID, + message: validateStatus.message, + }; + return new Response(JSON.stringify(response)); + } catch (error) { + console.error('Error validating peer:', error); + } } else if (mode === 'create') { const req: CreatePeerRequest = { peer }; console.log('/peer/create req:', req); - const createStatus: CreatePeerResponse = await fetch( - `${flowServiceAddr}/v1/peers/create`, - { - method: 'POST', - body: JSON.stringify(req), - } - ).then((res) => { - return res.json(); - }); - let response: UCreatePeerResponse = { - created: - createPeerStatusFromJSON(createStatus.status) === - CreatePeerStatus.CREATED, - message: createStatus.message, - }; - return new Response(JSON.stringify(response)); + try { + const createStatus: CreatePeerResponse = await fetch( + `${flowServiceAddr}/v1/peers/create`, + { + method: 'POST', + body: JSON.stringify(req), + } + ).then((res) => { + return res.json(); + }); + let response: UCreatePeerResponse = { + created: + createPeerStatusFromJSON(createStatus.status) === + CreatePeerStatus.CREATED, + message: createStatus.message, + }; + return new Response(JSON.stringify(response)); + } catch (error) { + console.error('Error creating peer:', error); + } } } diff --git a/ui/app/api/peers/schemas/route.ts b/ui/app/api/peers/schemas/route.ts index 0848692702..0c701cb713 100644 --- a/ui/app/api/peers/schemas/route.ts +++ b/ui/app/api/peers/schemas/route.ts @@ -5,18 +5,22 @@ export async function POST(request: Request) { const body = await request.json(); const { peerName } = body; const flowServiceAddr = GetFlowHttpAddressFromEnv(); - const schemaList = await fetch( - `${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}` - ).then((res) => { - return res.json(); - }); - let response: USchemasResponse = { - schemas: schemaList.schemas, - }; - if (schemaList.message === 'no rows in result set') { - response = { - schemas: [], + try { + const schemaList = await fetch( + `${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}` + ).then((res) => { + return res.json(); + }); + let response: USchemasResponse = { + schemas: schemaList.schemas, }; + if (schemaList.message === 'no rows in result set') { + response = { + schemas: [], + }; + } + return new Response(JSON.stringify(response)); + } catch (e) { + console.log(e); } - return new Response(JSON.stringify(response)); } diff --git a/ui/app/api/peers/tables/all/route.ts b/ui/app/api/peers/tables/all/route.ts index c372b41a32..92223f7bbf 100644 --- a/ui/app/api/peers/tables/all/route.ts +++ b/ui/app/api/peers/tables/all/route.ts @@ -6,13 +6,17 @@ export async function POST(request: Request) { const body = await request.json(); const { peerName } = body; const flowServiceAddr = GetFlowHttpAddressFromEnv(); - const tableList: SchemaTablesResponse = await fetch( - `${flowServiceAddr}/v1/peers/tables/all?peer_name=${peerName}` - ).then((res) => { - return res.json(); - }); - let response: UTablesResponse = { - tables: tableList.tables, - }; - return new Response(JSON.stringify(response)); + try { + const tableList: SchemaTablesResponse = await fetch( + `${flowServiceAddr}/v1/peers/tables/all?peer_name=${peerName}` + ).then((res) => { + return res.json(); + }); + let response: UTablesResponse = { + tables: tableList.tables, + }; + return new Response(JSON.stringify(response)); + } catch (e) { + console.log(e); + } } diff --git a/ui/app/api/peers/tables/route.ts b/ui/app/api/peers/tables/route.ts index 53f606d48c..b4c73500dd 100644 --- a/ui/app/api/peers/tables/route.ts +++ b/ui/app/api/peers/tables/route.ts @@ -6,13 +6,17 @@ export async function POST(request: Request) { const body = await request.json(); const { peerName, schemaName } = body; const flowServiceAddr = GetFlowHttpAddressFromEnv(); - const tableList: SchemaTablesResponse = await fetch( - `${flowServiceAddr}/v1/peers/tables?peer_name=${peerName}&schema_name=${schemaName}` - ).then((res) => { - return res.json(); - }); - let response: UTablesResponse = { - tables: tableList.tables, - }; - return new Response(JSON.stringify(response)); + try { + const tableList: SchemaTablesResponse = await fetch( + `${flowServiceAddr}/v1/peers/tables?peer_name=${peerName}&schema_name=${schemaName}` + ).then((res) => { + return res.json(); + }); + let response: UTablesResponse = { + tables: tableList.tables, + }; + return new Response(JSON.stringify(response)); + } catch (e) { + console.log(e); + } }