diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 70f65df13f..dffa4e0840 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -95,7 +95,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 8 ./... -timeout 2400s + gotestsum --format testname -- -p 8 ./... -timeout 1200s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 8fe8e4be2b..1c71c26b21 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -176,7 +176,6 @@ func (a *FlowableActivity) CreateNormalizedTable( return setupNormalizedTablesOutput, nil } -// StartFlow implements StartFlow. func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput, ) (*model.SyncResponse, error) { diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 2749566ced..b8831be301 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -500,16 +500,15 @@ func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) { } // SyncRecords pushes records to the destination. -// currently only supports inserts,updates and deletes -// more record types will be added in the future. +// Currently only supports inserts, updates, and deletes. +// More record types will be added in the future. func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) c.logger.Info(fmt.Sprintf("pushing records to %s.%s...", c.datasetID, rawTableName)) - // generate a sequential number for the last synced batch - // this sequence will be used to keep track of records that are normalized - // in the NormalizeFlowWorkflow + // generate a sequential number for last synced batch this sequence will be + // used to keep track of records that are normalized in NormalizeFlowWorkflow syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) if err != nil { return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index e881dd5ead..df1ff17c13 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -115,7 +115,7 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { $$ language sql; CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') + SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0'), ''), 'hex') FROM generate_series(1, $1); $body$ LANGUAGE 'sql' diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go new file mode 100644 index 0000000000..af14e11b8f --- /dev/null +++ b/flow/workflows/normalize_flow.go @@ -0,0 +1,66 @@ +package peerflow + +import ( + "fmt" + "time" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" +) + +type NormalizeFlowState struct { + CDCFlowName string + Progress []string +} + +type NormalizeFlowExecution struct { + NormalizeFlowState + executionID string + logger log.Logger +} + +func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution { + return &NormalizeFlowExecution{ + NormalizeFlowState: *state, + executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + logger: workflow.GetLogger(ctx), + } +} + +func NormalizeFlowWorkflow(ctx workflow.Context, + config *protos.FlowConnectionConfigs, +) (*model.NormalizeResponse, error) { + s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ + CDCFlowName: config.FlowJobName, + Progress: []string{}, + }) + + return s.executeNormalizeFlow(ctx, config) +} + +func (s *NormalizeFlowExecution) executeNormalizeFlow( + ctx workflow.Context, + config *protos.FlowConnectionConfigs, +) (*model.NormalizeResponse, error) { + s.logger.Info("executing normalize flow - ", s.CDCFlowName) + + normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 7 * 24 * time.Hour, + HeartbeatTimeout: 5 * time.Minute, + }) + + // execute StartFlow on the peers to start the flow + startNormalizeInput := &protos.StartNormalizeInput{ + FlowConnectionConfigs: config, + } + fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) + + var normalizeResponse *model.NormalizeResponse + if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { + return nil, fmt.Errorf("failed to flow: %w", err) + } + + return normalizeResponse, nil +} diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 0b207bc65f..3ee45aecf9 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -21,17 +21,6 @@ type SyncFlowExecution struct { logger log.Logger } -type NormalizeFlowState struct { - CDCFlowName string - Progress []string -} - -type NormalizeFlowExecution struct { - NormalizeFlowState - executionID string - logger log.Logger -} - // NewSyncFlowExecution creates a new instance of SyncFlowExecution. func NewSyncFlowExecution(ctx workflow.Context, state *SyncFlowState) *SyncFlowExecution { return &SyncFlowExecution{ @@ -41,14 +30,6 @@ func NewSyncFlowExecution(ctx workflow.Context, state *SyncFlowState) *SyncFlowE } } -func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution { - return &NormalizeFlowExecution{ - NormalizeFlowState: *state, - executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), - } -} - // executeSyncFlow executes the sync flow. func (s *SyncFlowExecution) executeSyncFlow( ctx workflow.Context, @@ -131,39 +112,3 @@ func SyncFlowWorkflow(ctx workflow.Context, return s.executeSyncFlow(ctx, config, options, options.RelationMessageMapping) } - -func NormalizeFlowWorkflow(ctx workflow.Context, - config *protos.FlowConnectionConfigs, -) (*model.NormalizeResponse, error) { - s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ - CDCFlowName: config.FlowJobName, - Progress: []string{}, - }) - - return s.executeNormalizeFlow(ctx, config) -} - -func (s *NormalizeFlowExecution) executeNormalizeFlow( - ctx workflow.Context, - config *protos.FlowConnectionConfigs, -) (*model.NormalizeResponse, error) { - s.logger.Info("executing normalize flow - ", s.CDCFlowName) - - normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, - HeartbeatTimeout: 5 * time.Minute, - }) - - // execute StartFlow on the peers to start the flow - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) - - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - return nil, fmt.Errorf("failed to flow: %w", err) - } - - return normalizeResponse, nil -}