Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for the workflow to actually close before proceeding #417

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package main
import (
"context"
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
backoff "github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -119,6 +121,11 @@ func (h *FlowRequestHandler) ShutdownFlow(
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
}

err = h.waitForWorkflowClose(ctx, req.WorkflowId)
if err != nil {
return nil, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
}

workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
Expand All @@ -143,6 +150,42 @@ func (h *FlowRequestHandler) ShutdownFlow(
}, nil
}

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 5 * time.Second
expBackoff.MaxInterval = 30 * time.Second
expBackoff.MaxElapsedTime = 5 * time.Minute

// empty will terminate the latest run
runID := ""

operation := func() error {
workflowRes, err := h.temporalClient.DescribeWorkflowExecution(ctx, workflowID, runID)
if err != nil {
// Permanent error will stop the retries
return backoff.Permanent(fmt.Errorf("unable to describe PeerFlow workflow: %w", err))
}

if workflowRes.WorkflowExecutionInfo.CloseTime != nil {
return nil
}

return fmt.Errorf("workflow - %s not closed yet: %v", workflowID, workflowRes)
}

err := backoff.Retry(operation, expBackoff)
if err != nil {
// terminate workflow if it is still running
reason := "PeerFlow workflow did not close in time"
err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason)
if err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}

return nil
}

func (h *FlowRequestHandler) ListPeers(
ctx context.Context,
req *protos.ListPeersRequest,
Expand Down
6 changes: 0 additions & 6 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,6 @@ func (c *EventHubConnector) SetupNormalizedTables(
}, nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
_, err := c.pgMetadata.pool.Exec(c.ctx, fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
metadataSchema))
return err
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/eventhub/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,11 @@ func (c *EventHubConnector) incrementSyncBatchID(jobName string) error {

return nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
_, err := c.pgMetadata.pool.Exec(c.ctx, `
DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
return err
}
1 change: 1 addition & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI=
github.com/cactus/go-statsd-client/v5 v5.0.0/go.mod h1:COEvJ1E+/E2L4q6QE5CkjWPi4eeDw9maJBMIuMPBZbY=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
Expand Down Expand Up @@ -949,6 +951,7 @@ github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpx
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M=
github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
Expand All @@ -969,6 +972,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM=
github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg=
github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU=
github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
Expand Down Expand Up @@ -1148,6 +1152,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down Expand Up @@ -1207,6 +1212,7 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
Expand Down
Loading