Skip to content

Commit

Permalink
UI: Log errors on server side API calls (#803)
Browse files Browse the repository at this point in the history
For more visibility, this PR:
- Catches server side API call errors and logs them
- Ensures relevant GRPC APIs (create cdc, create qrep, drop mirror) are
well logged too
- All errors from the Create Peer and Validate Peer GRPC functions are
captured by the frontend so I'm not logging there for now
  • Loading branch information
Amogh-Bharadwaj authored Dec 12, 2023
1 parent 502bf0b commit 36cc29f
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 125 deletions.
36 changes: 36 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -237,6 +238,8 @@ func (h *FlowRequestHandler) CreateQRepFlow(
if req.CreateCatalogEntry {
err := h.createQrepJobEntry(ctx, req, workflowID)
if err != nil {
slog.Error("unable to create flow job entry",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}
Expand All @@ -251,6 +254,8 @@ func (h *FlowRequestHandler) CreateQRepFlow(
// hack to facilitate migrating from existing xmin sync
txid, err := strconv.ParseInt(postColon, 10, 64)
if err != nil {
slog.Error("invalid xmin txid for xmin rep",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err)
}
state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}}
Expand All @@ -262,11 +267,15 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}
_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
}

err = h.updateQRepConfigInCatalog(cfg)
if err != nil {
slog.Error("unable to update qrep config in catalog",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to update qrep config in catalog: %w", err)
}

Expand Down Expand Up @@ -301,6 +310,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
ctx context.Context,
req *protos.ShutdownRequest,
) (*protos.ShutdownResponse, error) {
logs := slog.Group("shutdown-log",
slog.String("flowName", req.FlowJobName),
slog.String("workflowId", req.WorkflowId),
)
err := h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
Expand All @@ -309,6 +322,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
shared.ShutdownSignal,
)
if err != nil {
slog.Error("unable to signal PeerFlow workflow",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to signal PeerFlow workflow: %v", err),
Expand All @@ -317,6 +334,10 @@ func (h *FlowRequestHandler) ShutdownFlow(

err = h.waitForWorkflowClose(ctx, req.WorkflowId)
if err != nil {
slog.Error("unable to wait for PeerFlow workflow to close",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for PeerFlow workflow to close: %v", err),
Expand All @@ -338,6 +359,9 @@ func (h *FlowRequestHandler) ShutdownFlow(
req, // workflow input
)
if err != nil {
slog.Error("unable to start DropFlow workflow",
logs,
slog.Any("error", err))
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err),
Expand All @@ -355,6 +379,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err),
Expand All @@ -363,6 +391,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
case <-time.After(1 * time.Minute):
err := h.handleWorkflowNotClosed(ctx, workflowID, "")
if err != nil {
slog.Error("unable to wait for DropFlow workflow to close",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err),
Expand All @@ -373,6 +405,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
if req.RemoveFlowEntry {
delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
if delErr != nil {
slog.Error("unable to remove flow job entry",
slog.String("flowName", req.FlowJobName),
slog.Any("error", err),
slog.String("workflowId", req.WorkflowId))
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: err.Error(),
Expand Down
30 changes: 17 additions & 13 deletions ui/app/api/mirrors/cdc/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@ export async function POST(request: Request) {
connectionConfigs: config,
createCatalogEntry: true,
};
const createStatus: CreateCDCFlowResponse = await fetch(
`${flowServiceAddr}/v1/flows/cdc/create`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
try {
const createStatus: CreateCDCFlowResponse = await fetch(
`${flowServiceAddr}/v1/flows/cdc/create`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});

let response: UCreateMirrorResponse = {
created: !!createStatus.worflowId,
};
let response: UCreateMirrorResponse = {
created: !!createStatus.worflowId,
};

return new Response(JSON.stringify(response));
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
}
}
32 changes: 18 additions & 14 deletions ui/app/api/mirrors/drop/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@ export async function POST(request: Request) {
removeFlowEntry: true,
};
console.log('/drop/mirror: req:', req);
const dropStatus: ShutdownResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/drop`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UDropMirrorResponse = {
dropped: dropStatus.ok,
errorMessage: dropStatus.errorMessage,
};
try {
const dropStatus: ShutdownResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/drop`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UDropMirrorResponse = {
dropped: dropStatus.ok,
errorMessage: dropStatus.errorMessage,
};

return new Response(JSON.stringify(response));
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
}
}
30 changes: 17 additions & 13 deletions ui/app/api/mirrors/qrep/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ export async function POST(request: Request) {
qrepConfig: config,
createCatalogEntry: true,
};
const createStatus: CreateQRepFlowResponse = await fetch(
`${flowServiceAddr}/v1/flows/qrep/create`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UCreateMirrorResponse = {
created: !!createStatus.worflowId,
};
try {
const createStatus: CreateQRepFlowResponse = await fetch(
`${flowServiceAddr}/v1/flows/qrep/create`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UCreateMirrorResponse = {
created: !!createStatus.worflowId,
};

return new Response(JSON.stringify(response));
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
}
}
22 changes: 13 additions & 9 deletions ui/app/api/peers/columns/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ export async function POST(request: Request) {
const body = await request.json();
const { peerName, schemaName, tableName } = body;
const flowServiceAddr = GetFlowHttpAddressFromEnv();
const columnsList: TableColumnsResponse = await fetch(
`${flowServiceAddr}/v1/peers/columns?peer_name=${peerName}&schema_name=${schemaName}&table_name=${tableName}`
).then((res) => {
return res.json();
});
let response: UColumnsResponse = {
columns: columnsList.columns,
};
return new Response(JSON.stringify(response));
try {
const columnsList: TableColumnsResponse = await fetch(
`${flowServiceAddr}/v1/peers/columns?peer_name=${peerName}&schema_name=${schemaName}&table_name=${tableName}`
).then((res) => {
return res.json();
});
let response: UColumnsResponse = {
columns: columnsList.columns,
};
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
}
}
32 changes: 18 additions & 14 deletions ui/app/api/peers/drop/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,23 @@ export async function POST(request: Request) {
peerName,
};
console.log('/drop/peer: req:', req);
const dropStatus: DropPeerResponse = await fetch(
`${flowServiceAddr}/v1/peers/drop`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UDropPeerResponse = {
dropped: dropStatus.ok,
errorMessage: dropStatus.errorMessage,
};
try {
const dropStatus: DropPeerResponse = await fetch(
`${flowServiceAddr}/v1/peers/drop`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UDropPeerResponse = {
dropped: dropStatus.ok,
errorMessage: dropStatus.errorMessage,
};

return new Response(JSON.stringify(response));
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
}
}
72 changes: 40 additions & 32 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,41 +71,49 @@ export async function POST(request: Request) {
const peer = constructPeer(name, type, config);
if (mode === 'validate') {
const validateReq: ValidatePeerRequest = { peer };
const validateStatus: ValidatePeerResponse = await fetch(
`${flowServiceAddr}/v1/peers/validate`,
{
method: 'POST',
body: JSON.stringify(validateReq),
}
).then((res) => {
return res.json();
});
let response: UValidatePeerResponse = {
valid:
validatePeerStatusFromJSON(validateStatus.status) ===
ValidatePeerStatus.VALID,
message: validateStatus.message,
};
return new Response(JSON.stringify(response));
try {
const validateStatus: ValidatePeerResponse = await fetch(
`${flowServiceAddr}/v1/peers/validate`,
{
method: 'POST',
body: JSON.stringify(validateReq),
}
).then((res) => {
return res.json();
});
let response: UValidatePeerResponse = {
valid:
validatePeerStatusFromJSON(validateStatus.status) ===
ValidatePeerStatus.VALID,
message: validateStatus.message,
};
return new Response(JSON.stringify(response));
} catch (error) {
console.error('Error validating peer:', error);
}
} else if (mode === 'create') {
const req: CreatePeerRequest = { peer };
console.log('/peer/create req:', req);
const createStatus: CreatePeerResponse = await fetch(
`${flowServiceAddr}/v1/peers/create`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UCreatePeerResponse = {
created:
createPeerStatusFromJSON(createStatus.status) ===
CreatePeerStatus.CREATED,
message: createStatus.message,
};
return new Response(JSON.stringify(response));
try {
const createStatus: CreatePeerResponse = await fetch(
`${flowServiceAddr}/v1/peers/create`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UCreatePeerResponse = {
created:
createPeerStatusFromJSON(createStatus.status) ===
CreatePeerStatus.CREATED,
message: createStatus.message,
};
return new Response(JSON.stringify(response));
} catch (error) {
console.error('Error creating peer:', error);
}
}
}

Expand Down
Loading

0 comments on commit 36cc29f

Please sign in to comment.