Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split normalize flow logic out of sync_flow.go #886

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 2400s
gotestsum --format testname -- -p 8 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (a *FlowableActivity) CreateNormalizedTable(
return setupNormalizedTablesOutput, nil
}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,15 @@ func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
}

// SyncRecords pushes records to the destination.
// currently only supports inserts,updates and deletes
// more record types will be added in the future.
// Currently only supports inserts, updates, and deletes.
// More record types will be added in the future.
func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

c.logger.Info(fmt.Sprintf("pushing records to %s.%s...", c.datasetID, rawTableName))

// generate a sequential number for the last synced batch
// this sequence will be used to keep track of records that are normalized
// in the NormalizeFlowWorkflow
// generate a sequential number for last synced batch this sequence will be
// used to keep track of records that are normalized in NormalizeFlowWorkflow
syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) {
$$ language sql;
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0'), ''), 'hex')
FROM generate_series(1, $1);
$body$
LANGUAGE 'sql'
Expand Down
66 changes: 66 additions & 0 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package peerflow

import (
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)

type NormalizeFlowState struct {
CDCFlowName string
Progress []string
}

type NormalizeFlowExecution struct {
NormalizeFlowState
executionID string
logger log.Logger
}

func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
NormalizeFlowState: *state,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeNormalizeFlow(ctx, config)
}

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
}

return normalizeResponse, nil
}
55 changes: 0 additions & 55 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@ type SyncFlowExecution struct {
logger log.Logger
}

type NormalizeFlowState struct {
CDCFlowName string
Progress []string
}

type NormalizeFlowExecution struct {
NormalizeFlowState
executionID string
logger log.Logger
}

// NewSyncFlowExecution creates a new instance of SyncFlowExecution.
func NewSyncFlowExecution(ctx workflow.Context, state *SyncFlowState) *SyncFlowExecution {
return &SyncFlowExecution{
Expand All @@ -41,14 +30,6 @@ func NewSyncFlowExecution(ctx workflow.Context, state *SyncFlowState) *SyncFlowE
}
}

func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
NormalizeFlowState: *state,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

// executeSyncFlow executes the sync flow.
func (s *SyncFlowExecution) executeSyncFlow(
ctx workflow.Context,
Expand Down Expand Up @@ -131,39 +112,3 @@ func SyncFlowWorkflow(ctx workflow.Context,

return s.executeSyncFlow(ctx, config, options, options.RelationMessageMapping)
}

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeNormalizeFlow(ctx, config)
}

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
}

return normalizeResponse, nil
}
Loading