Skip to content

Commit

Permalink
Merge branch 'main' into exclude-columns
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Nov 2, 2023
2 parents b580541 + f0c8e6f commit 731e9fc
Show file tree
Hide file tree
Showing 23 changed files with 1,715 additions and 697 deletions.
99 changes: 93 additions & 6 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
return nil
}

func (h *FlowRequestHandler) removeFlowEntryInCatalog(
flowName string,
) error {
_, err := h.pool.Exec(context.Background(),
"DELETE FROM flows WHERE name = $1",
flowName)
if err != nil {
return fmt.Errorf("unable to remove flow entry in catalog: %w", err)
}

return nil
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
lastPartition := &protos.QRepPartition{
Expand All @@ -201,7 +214,6 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

cfg := req.QrepConfig
log.Infof("Config for QRepFlow: %+v", cfg)
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
Expand Down Expand Up @@ -270,12 +282,18 @@ func (h *FlowRequestHandler) ShutdownFlow(
shared.ShutdownSignal,
)
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to signal PeerFlow workflow: %v", err),
}, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
}

err = h.waitForWorkflowClose(ctx, req.WorkflowId)
if err != nil {
return nil, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for PeerFlow workflow to close: %v", err),
}, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
}

workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New())
Expand All @@ -290,7 +308,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
req, // workflow input
)
if err != nil {
return nil, fmt.Errorf("unable to start DropFlow workflow: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err),
}, fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
Expand All @@ -304,15 +325,29 @@ func (h *FlowRequestHandler) ShutdownFlow(
select {
case err := <-errChan:
if err != nil {
return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err),
}, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(1 * time.Minute):
err := h.handleWorkflowNotClosed(ctx, workflowID, "")
if err != nil {
return nil, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err),
}, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
}

delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
if delErr != nil {
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: err.Error(),
}, err
}

return &protos.ShutdownResponse{
Ok: true,
}, nil
Expand Down Expand Up @@ -501,3 +536,55 @@ func (h *FlowRequestHandler) CreatePeer(
Message: "",
}, nil
}

func (h *FlowRequestHandler) DropPeer(
ctx context.Context,
req *protos.DropPeerRequest,
) (*protos.DropPeerResponse, error) {
if req.PeerName == "" {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Peer %s not found", req.PeerName),
}, fmt.Errorf("peer %s not found", req.PeerName)
}

// Check if peer name is in flows table
peerID, _, err := h.getPeerID(ctx, req.PeerName)
if err != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Failed to obtain peer ID for peer %s: %v", req.PeerName, err),
}, fmt.Errorf("failed to obtain peer ID for peer %s: %v", req.PeerName, err)
}

var inMirror int64
queryErr := h.pool.QueryRow(ctx,
"SELECT COUNT(*) FROM flows WHERE source_peer=$1 or destination_peer=$2",
peerID, peerID).Scan(&inMirror)
if queryErr != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Failed to check for existing mirrors with peer %s: %v", req.PeerName, queryErr),
}, fmt.Errorf("failed to check for existing mirrors with peer %s", req.PeerName)
}

if inMirror != 0 {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Peer %s is currently involved in an ongoing mirror.", req.PeerName),
}, nil
}

_, delErr := h.pool.Exec(ctx, "DELETE FROM peers WHERE name = $1", req.PeerName)
if delErr != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("failed to delete peer %s from metadata table: %v", req.PeerName, delErr),
}, fmt.Errorf("failed to delete peer %s from metadata table: %v", req.PeerName, delErr)
}

return &protos.DropPeerResponse{
Ok: true,
}, nil

}
Loading

0 comments on commit 731e9fc

Please sign in to comment.