Skip to content

Commit

Permalink
Split normalize flow logic out of sync_flow.go (#886)
Browse files Browse the repository at this point in the history
Refactoring to split up #754
  • Loading branch information
serprex authored Dec 22, 2023
1 parent 0cb7abe commit 9c4f9cc
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 2400s
gotestsum --format testname -- -p 8 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (a *FlowableActivity) CreateNormalizedTable(
return setupNormalizedTablesOutput, nil
}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,15 @@ func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
}

// SyncRecords pushes records to the destination.
// currently only supports inserts,updates and deletes
// more record types will be added in the future.
// Currently only supports inserts, updates, and deletes.
// More record types will be added in the future.
func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

c.logger.Info(fmt.Sprintf("pushing records to %s.%s...", c.datasetID, rawTableName))

// generate a sequential number for the last synced batch
// this sequence will be used to keep track of records that are normalized
// in the NormalizeFlowWorkflow
// generate a sequential number for last synced batch this sequence will be
// used to keep track of records that are normalized in NormalizeFlowWorkflow
syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) {
$$ language sql;
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0'), ''), 'hex')
FROM generate_series(1, $1);
$body$
LANGUAGE 'sql'
Expand Down
66 changes: 66 additions & 0 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package peerflow

import (
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)

type NormalizeFlowState struct {
CDCFlowName string
Progress []string
}

type NormalizeFlowExecution struct {
NormalizeFlowState
executionID string
logger log.Logger
}

func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
NormalizeFlowState: *state,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeNormalizeFlow(ctx, config)
}

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
}

return normalizeResponse, nil
}
55 changes: 0 additions & 55 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@ type SyncFlowExecution struct {
logger log.Logger
}

type NormalizeFlowState struct {
CDCFlowName string
Progress []string
}

type NormalizeFlowExecution struct {
NormalizeFlowState
executionID string
logger log.Logger
}

// NewSyncFlowExecution creates a new instance of SyncFlowExecution.
func NewSyncFlowExecution(ctx workflow.Context, state *SyncFlowState) *SyncFlowExecution {
return &SyncFlowExecution{
Expand All @@ -41,14 +30,6 @@ func NewSyncFlowExecution(ctx workflow.Context, state *SyncFlowState) *SyncFlowE
}
}

func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
NormalizeFlowState: *state,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

// executeSyncFlow executes the sync flow.
func (s *SyncFlowExecution) executeSyncFlow(
ctx workflow.Context,
Expand Down Expand Up @@ -131,39 +112,3 @@ func SyncFlowWorkflow(ctx workflow.Context,

return s.executeSyncFlow(ctx, config, options, options.RelationMessageMapping)
}

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeNormalizeFlow(ctx, config)
}

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
}

return normalizeResponse, nil
}

0 comments on commit 9c4f9cc

Please sign in to comment.