Skip to content

Commit

Permalink
drop flow refactor (#2208)
Browse files Browse the repository at this point in the history
move flow entries removal to an activity, and make it before dropping
source and destination as that can fail and the workflow will be
cancelled
  • Loading branch information
heavycrystal authored Oct 31, 2024
1 parent 9b4d837 commit 4a6bd86
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 98 deletions.
31 changes: 31 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,34 @@ func (a *FlowableActivity) RemoveTablesFromCatalog(

return err
}

func (a *FlowableActivity) RemoveFlowEntryFromCatalog(ctx context.Context, flowName string) error {
logger := log.With(activity.GetLogger(ctx),
slog.String(string(shared.FlowNameKey), flowName))
tx, err := a.CatalogPool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction to remove flow entries from catalog: %w", err)
}
defer shared.RollbackTx(tx, slog.Default())

if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil {
return fmt.Errorf("unable to clear table_schema_mapping in catalog: %w", err)
}

ct, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName)
if err != nil {
return fmt.Errorf("unable to remove flow entry in catalog: %w", err)
}
if ct.RowsAffected() == 0 {
logger.Warn("flow entry not found in catalog, 0 records deleted")
} else {
logger.Info("flow entries removed from catalog",
slog.Int("rowsAffected", int(ct.RowsAffected())))
}

if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction to remove flow entries from catalog: %w", err)
}

return nil
}
103 changes: 37 additions & 66 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,31 +175,6 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
return shared.UpdateCDCConfigInCatalog(ctx, h.pool, slog.Default(), cfg)
}

func (h *FlowRequestHandler) removeFlowEntryInCatalog(
ctx context.Context,
flowName string,
) error {
tx, err := h.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("unable to begin tx to remove flow entry in catalog: %w", err)
}
defer shared.RollbackTx(tx, slog.Default())

if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil {
return fmt.Errorf("unable to clear table_schema_mapping to remove flow entry in catalog: %w", err)
}

if _, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName); err != nil {
return fmt.Errorf("unable to remove flow entry in catalog: %w", err)
}

if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("unable to commit remove flow entry in catalog: %w", err)
}

return nil
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
Expand Down Expand Up @@ -295,56 +270,52 @@ func (h *FlowRequestHandler) shutdownFlow(
if err != nil {
slog.Error("unable to check if workflow is cdc", logs, slog.Any("error", err))
return fmt.Errorf("unable to determine if workflow is cdc: %w", err)
} else if isCdc {
cdcConfig, err := h.getFlowConfigFromCatalog(ctx, flowJobName)
}
var cdcConfig *protos.FlowConnectionConfigs
if isCdc {
cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName)
if err != nil {
slog.Error("unable to get cdc config from catalog", logs, slog.Any("error", err))
return fmt.Errorf("unable to get cdc config from catalog: %w", err)
}
workflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(flowJobName),
}
}
dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: dropFlowWorkflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(flowJobName),
}

dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions,
peerflow.DropFlowWorkflow, &protos.DropFlowInput{
FlowJobName: flowJobName,
SourcePeerName: cdcConfig.SourceName,
DestinationPeerName: cdcConfig.DestinationName,
DropFlowStats: deleteStats,
})
if err != nil {
slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err))
return fmt.Errorf("unable to start DropFlow workflow: %w", err)
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions,
peerflow.DropFlowWorkflow, &protos.DropFlowInput{
FlowJobName: flowJobName,
DropFlowStats: deleteStats,
FlowConnectionConfigs: cdcConfig,
})
if err != nil {
slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err))
return fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()
errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()

select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err))
return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(5 * time.Minute):
if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil {
slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err))
return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err))
return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(5 * time.Minute):
if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil {
slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err))
return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
}

if err := h.removeFlowEntryInCatalog(ctx, flowJobName); err != nil {
slog.Error("unable to remove flow job entry", logs, slog.Any("error", err))
return err
}

return nil
Expand Down
85 changes: 55 additions & 30 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,27 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error {
workflow.GetLogger(ctx).Info("performing cleanup for flow", slog.String(string(shared.FlowNameKey), config.FlowJobName))

func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) error {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter()))

dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
HeartbeatTimeout: 1 * time.Minute,
})

var sourceError, destinationError error
var sourceOk, destinationOk, canceled bool
selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-drop")
selector := workflow.NewNamedSelector(ctx, input.FlowJobName+"-drop")
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})

var dropSource, dropDestination, dropStats func(f workflow.Future)
var dropSource, dropDestination func(f workflow.Future)
dropSource = func(f workflow.Future) {
sourceError = f.Get(ctx, nil)
sourceOk = sourceError == nil
if !sourceOk {
dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{
FlowJobName: config.FlowJobName,
PeerName: config.SourcePeerName,
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.SourceName,
})
selector.AddFuture(dropSourceFuture, dropSource)
_ = workflow.Sleep(ctx, time.Second)
Expand All @@ -51,34 +43,25 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error
destinationOk = destinationError == nil
if !destinationOk {
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{
FlowJobName: config.FlowJobName,
PeerName: config.DestinationPeerName,
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.DestinationName,
})
selector.AddFuture(dropDestinationFuture, dropDestination)
_ = workflow.Sleep(ctx, time.Second)
}
}
dropStats = func(f workflow.Future) {
statsError := f.Get(dropStatsCtx, nil)
if statsError != nil {
// not fatal
workflow.GetLogger(ctx).Warn("failed to delete mirror stats", slog.Any("error", statsError))
}
}

dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{
FlowJobName: config.FlowJobName,
PeerName: config.SourcePeerName,
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.SourceName,
})
selector.AddFuture(dropSourceFuture, dropSource)
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{
FlowJobName: config.FlowJobName,
PeerName: config.DestinationPeerName,
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.DestinationName,
})

selector.AddFuture(dropDestinationFuture, dropDestination)
if config.DropFlowStats {
dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, flowable.DeleteMirrorStats, config.FlowJobName)
selector.AddFuture(dropStatsFuture, dropStats)
}

for {
selector.Select(ctx)
Expand All @@ -89,3 +72,45 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error
}
}
}

func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error {
ctx = workflow.WithValue(ctx, shared.FlowNameKey, input.FlowJobName)
workflow.GetLogger(ctx).Info("performing cleanup for flow",
slog.String(string(shared.FlowNameKey), input.FlowJobName))

if input.FlowConnectionConfigs != nil && input.DropFlowStats {
dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
HeartbeatTimeout: 1 * time.Minute,
})
dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx,
flowable.DeleteMirrorStats, input.FlowJobName)
err := dropStatsFuture.Get(dropStatsCtx, nil)
if err != nil {
workflow.GetLogger(ctx).Error("failed to delete mirror stats", slog.Any("error", err))
return err
}
}

removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
})
removeFromCatalogFuture := workflow.ExecuteActivity(removeFlowEntriesCtx,
flowable.RemoveFlowEntryFromCatalog, input.FlowJobName)
err := removeFromCatalogFuture.Get(ctx, nil)
if err != nil {
workflow.GetLogger(ctx).Error("failed to remove flow entries from catalog", slog.Any("error", err))
return err
}

if input.FlowConnectionConfigs != nil {
err := executeCDCDropActivities(ctx, input)
if err != nil {
workflow.GetLogger(ctx).Error("failed to drop CDC flow", slog.Any("error", err))
return err
}
workflow.GetLogger(ctx).Info("CDC flow dropped successfully")
}

return nil
}
4 changes: 2 additions & 2 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,10 @@ message QRepParitionResult {
}

message DropFlowInput {
reserved 2,3;
string flow_job_name = 1;
string source_peer_name = 2;
string destination_peer_name = 3;
bool drop_flow_stats = 4;
FlowConnectionConfigs flow_connection_configs = 5;
}

message TableSchemaDelta {
Expand Down

0 comments on commit 4a6bd86

Please sign in to comment.