Skip to content

Commit

Permalink
Wait for the workflow to actually close before proceeding (#417)
Browse files Browse the repository at this point in the history
fix - #415
  • Loading branch information
iskakaushik authored Sep 25, 2023
1 parent 41cf2e6 commit bb69450
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 6 deletions.
43 changes: 43 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package main
import (
"context"
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
backoff "github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -119,6 +121,11 @@ func (h *FlowRequestHandler) ShutdownFlow(
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
}

err = h.waitForWorkflowClose(ctx, req.WorkflowId)
if err != nil {
return nil, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
}

workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
Expand All @@ -143,6 +150,42 @@ func (h *FlowRequestHandler) ShutdownFlow(
}, nil
}

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 5 * time.Second
expBackoff.MaxInterval = 30 * time.Second
expBackoff.MaxElapsedTime = 5 * time.Minute

// empty will terminate the latest run
runID := ""

operation := func() error {
workflowRes, err := h.temporalClient.DescribeWorkflowExecution(ctx, workflowID, runID)
if err != nil {
// Permanent error will stop the retries
return backoff.Permanent(fmt.Errorf("unable to describe PeerFlow workflow: %w", err))
}

if workflowRes.WorkflowExecutionInfo.CloseTime != nil {
return nil
}

return fmt.Errorf("workflow - %s not closed yet: %v", workflowID, workflowRes)
}

err := backoff.Retry(operation, expBackoff)
if err != nil {
// terminate workflow if it is still running
reason := "PeerFlow workflow did not close in time"
err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason)
if err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}

return nil
}

func (h *FlowRequestHandler) ListPeers(
ctx context.Context,
req *protos.ListPeersRequest,
Expand Down
6 changes: 0 additions & 6 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,6 @@ func (c *EventHubConnector) SetupNormalizedTables(
}, nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
_, err := c.pgMetadata.pool.Exec(c.ctx, fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
metadataSchema))
return err
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/eventhub/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,11 @@ func (c *EventHubConnector) incrementSyncBatchID(jobName string) error {

return nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
_, err := c.pgMetadata.pool.Exec(c.ctx, `
DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
return err
}
1 change: 1 addition & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI=
github.com/cactus/go-statsd-client/v5 v5.0.0/go.mod h1:COEvJ1E+/E2L4q6QE5CkjWPi4eeDw9maJBMIuMPBZbY=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
Expand Down Expand Up @@ -949,6 +951,7 @@ github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpx
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M=
github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
Expand All @@ -969,6 +972,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM=
github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg=
github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU=
github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
Expand Down Expand Up @@ -1148,6 +1152,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down Expand Up @@ -1207,6 +1212,7 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
Expand Down

0 comments on commit bb69450

Please sign in to comment.