diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 0365c76a85..b3e8d9c29a 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -3,10 +3,12 @@ package main import ( "context" "fmt" + "time" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" + backoff "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" @@ -119,6 +121,11 @@ func (h *FlowRequestHandler) ShutdownFlow( return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err) } + err = h.waitForWorkflowClose(ctx, req.WorkflowId) + if err != nil { + return nil, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err) + } + workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ ID: workflowID, @@ -143,6 +150,42 @@ func (h *FlowRequestHandler) ShutdownFlow( }, nil } +func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error { + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = 5 * time.Second + expBackoff.MaxInterval = 30 * time.Second + expBackoff.MaxElapsedTime = 5 * time.Minute + + // empty will terminate the latest run + runID := "" + + operation := func() error { + workflowRes, err := h.temporalClient.DescribeWorkflowExecution(ctx, workflowID, runID) + if err != nil { + // Permanent error will stop the retries + return backoff.Permanent(fmt.Errorf("unable to describe PeerFlow workflow: %w", err)) + } + + if workflowRes.WorkflowExecutionInfo.CloseTime != nil { + return nil + } + + return fmt.Errorf("workflow - %s not closed yet: %v", workflowID, workflowRes) + } + + err := backoff.Retry(operation, expBackoff) + if err != nil { + // terminate workflow if it is still running + reason := "PeerFlow workflow did not close in time" + err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason) + if err != nil { + return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err) + } + } + + return nil +} + func (h *FlowRequestHandler) ListPeers( ctx context.Context, req *protos.ListPeersRequest, diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 6364dd7c66..158323e1a0 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -380,12 +380,6 @@ func (c *EventHubConnector) SetupNormalizedTables( }, nil } -func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { - _, err := c.pgMetadata.pool.Exec(c.ctx, fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", - metadataSchema)) - return err -} - func eventDataFromString(s string) *azeventhubs.EventData { return &azeventhubs.EventData{ Body: []byte(s), diff --git a/flow/connectors/eventhub/metadata.go b/flow/connectors/eventhub/metadata.go index 72410bce62..178ebbb076 100644 --- a/flow/connectors/eventhub/metadata.go +++ b/flow/connectors/eventhub/metadata.go @@ -226,3 +226,11 @@ func (c *EventHubConnector) incrementSyncBatchID(jobName string) error { return nil } + +func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { + _, err := c.pgMetadata.pool.Exec(c.ctx, ` + DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+` + WHERE job_name = $1 + `, jobName) + return err +} diff --git a/flow/go.mod b/flow/go.mod index 6c0dc857d2..de82d7c8e0 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -36,6 +36,7 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.4 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index d9dcb4ceb2..f9bf5d4853 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -867,6 +867,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI= github.com/cactus/go-statsd-client/v5 v5.0.0/go.mod h1:COEvJ1E+/E2L4q6QE5CkjWPi4eeDw9maJBMIuMPBZbY= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= @@ -949,6 +951,7 @@ github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpx github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= @@ -969,6 +972,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg= github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= @@ -1148,6 +1152,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -1207,6 +1212,7 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=