Skip to content

Commit

Permalink
fixes for UI and general
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 1, 2024
1 parent d85cf33 commit 43a366b
Show file tree
Hide file tree
Showing 17 changed files with 57 additions and 58 deletions.
21 changes: 10 additions & 11 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,21 +192,21 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName)
logger := activity.GetLogger(ctx)
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
config := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

logger.Info("pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
tblNameMapping := make(map[string]model.NameAndExclude, len(input.SyncFlowOptions.TableMappings))
for _, v := range input.SyncFlowOptions.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source)
srcConn, err := connectors.GetCDCPullConnector(errCtx, config.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
Expand Down Expand Up @@ -236,9 +236,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: input.SyncFlowOptions.BatchSize,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.SyncFlowOptions.IdleTimeoutSeconds),
),
IdleTimeout: time.Duration(input.SyncFlowOptions.IdleTimeoutSeconds) *
time.Second,
TableNameSchemaMapping: input.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand All @@ -251,7 +250,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})

hasRecords := !recordBatch.WaitAndCheckEmpty()
logger.Info("current sync flow has records?", hasRecords)
logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))

if !hasRecords {
// wait for the pull goroutine to finish
Expand All @@ -275,7 +274,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}

syncBatchID, err := dstConn.GetLastSyncBatchID(flowName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
if err != nil && config.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}
syncBatchID += 1
Expand All @@ -297,7 +296,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.FlowConnectionConfigs.TableMappings,
TableMappings: input.SyncFlowOptions.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

return &protos.CreateCDCFlowResponse{
WorflowId: workflowID,
WorkflowId: workflowID,
}, nil
}

Expand Down Expand Up @@ -290,7 +290,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

return &protos.CreateQRepFlowResponse{
WorflowId: workflowID,
WorkflowId: workflowID,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (h *FlowRequestHandler) CDCFlowStatus(
// patching config to show latest values from state
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
config.TableMappings = state.TableMappings

var initialCopyStatus *protos.SnapshotStatus

Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,6 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
return nil, nil
}

// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
// the state of the relation message somewhere
p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))
if p.relationMessageMapping[msg.RelationID] == nil {
Expand Down
11 changes: 0 additions & 11 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@ func PeerDBEventhubFlushTimeoutSeconds() time.Duration {
return time.Duration(x) * time.Second
}

// PEERDB_CDC_IDLE_TIMEOUT_SECONDS
func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration {
var x int
if providedValue > 0 {
x = providedValue
} else {
x = getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)
}
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_THRESHOLD
func PeerDBCDCDiskSpillThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000)
Expand Down
28 changes: 19 additions & 9 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,16 @@ type CDCFlowWorkflowState struct {
SyncFlowOptions *protos.SyncFlowOptions
// options passed to all NormalizeFlows
NormalizeFlowOptions *protos.NormalizeFlowOptions
// initially copied from config, all changes are made here though
TableMappings []*protos.TableMapping
}

// returns a new empty PeerFlowState
func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState {
func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWorkflowState {
tableMappings := make([]*protos.TableMapping, 0, len(cfgTableMappings))
for _, tableMapping := range cfgTableMappings {
tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping))
}
return &CDCFlowWorkflowState{
Progress: []string{"started"},
SyncFlowStatuses: nil,
Expand All @@ -81,6 +87,7 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState {
FlowConfigUpdates: nil,
SyncFlowOptions: nil,
NormalizeFlowOptions: nil,
TableMappings: tableMappings,
}
}

Expand Down Expand Up @@ -151,7 +158,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
if len(flowConfigUpdate.AdditionalTables) == 0 {
continue
}
if shared.AdditionalTablesHasOverlap(cfg.TableMappings, flowConfigUpdate.AdditionalTables) {
if shared.AdditionalTablesHasOverlap(state.TableMappings, flowConfigUpdate.AdditionalTables) {
return fmt.Errorf("duplicate source/destination tables found in additionalTables")
}

Expand All @@ -173,7 +180,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables

childAdditionalTablesCDCFlowID,
err := GetChildWorkflowID(ctx, "cdc-flow", additionalTablesWorkflowCfg.FlowJobName)
err := GetChildWorkflowID(ctx, "additional-cdc-flow", additionalTablesWorkflowCfg.FlowJobName)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,7 +214,8 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
for tableName, tableSchema := range res.TableNameSchemaMapping {
state.TableNameSchemaMapping[tableName] = tableSchema
}
cfg.TableMappings = append(cfg.TableMappings, flowConfigUpdate.AdditionalTables...)
state.TableMappings = append(state.TableMappings, flowConfigUpdate.AdditionalTables...)
state.SyncFlowOptions.TableMappings = state.TableMappings
// finished processing, wipe it
state.FlowConfigUpdates = nil
}
Expand All @@ -224,7 +232,7 @@ func CDCFlowWorkflowWithConfig(
return nil, fmt.Errorf("invalid connection configs")
}
if state == nil {
state = NewCDCFlowWorkflowState(len(cfg.TableMappings))
state = NewCDCFlowWorkflowState(cfg.TableMappings)
}

w := NewCDCFlowWorkflowExecution(ctx)
Expand Down Expand Up @@ -260,7 +268,7 @@ func CDCFlowWorkflowWithConfig(
// if resync is true, alter the table name schema mapping to temporarily add
// a suffix to the table names.
if cfg.Resync {
for _, mapping := range cfg.TableMappings {
for _, mapping := range state.TableMappings {
oldName := mapping.DestinationTableIdentifier
newName := fmt.Sprintf("%s_resync", oldName)
mapping.DestinationTableIdentifier = newName
Expand Down Expand Up @@ -328,7 +336,7 @@ func CDCFlowWorkflowWithConfig(
}
renameOpts.SyncedAtColName = &cfg.SyncedAtColName
correctedTableNameSchemaMapping := make(map[string]*protos.TableSchema)
for _, mapping := range cfg.TableMappings {
for _, mapping := range state.TableMappings {
oldName := mapping.DestinationTableIdentifier
newName := strings.TrimSuffix(oldName, "_resync")
renameOpts.RenameTableOptions = append(renameOpts.RenameTableOptions, &protos.RenameTableOption{
Expand Down Expand Up @@ -368,6 +376,7 @@ func CDCFlowWorkflowWithConfig(
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
TableMappings: state.TableMappings,
}
state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{
TableNameSchemaMapping: state.TableNameSchemaMapping,
Expand Down Expand Up @@ -468,7 +477,7 @@ func CDCFlowWorkflowWithConfig(
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED

for state.ActiveSignal == shared.PauseSignal {
w.logger.Info("mirror has been paused for ", time.Since(startTime))
w.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
Expand Down Expand Up @@ -539,7 +548,8 @@ func CDCFlowWorkflowWithConfig(
state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes)
state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping
totalRecordsSynced += childSyncFlowRes.NumRecordsSynced
w.logger.Info("Total records synced: ", totalRecordsSynced)
w.logger.Info("Total records synced: ",
slog.Int64("totalRecordsSynced", totalRecordsSynced))
}

if childSyncFlowRes != nil {
Expand Down
3 changes: 2 additions & 1 deletion flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peerflow

import (
"fmt"
"log/slog"
"time"

"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -65,7 +66,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
if lastSyncBatchID != syncBatchID {
lastSyncBatchID = syncBatchID

logger.Info("executing normalize - ", config.FlowJobName)
logger.Info("executing normalize - ", slog.String("flowName", config.FlowJobName))
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
TableNameSchemaMapping: tableNameSchemaMapping,
Expand Down
5 changes: 3 additions & 2 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peerflow

import (
"fmt"
"log/slog"
"strings"
"time"

Expand Down Expand Up @@ -471,7 +472,7 @@ func QRepFlowWorkflow(
return err
}

logger.Info("consolidating partitions for peer flow - ", config.FlowJobName)
logger.Info("consolidating partitions for peer flow - ", slog.String("flowName", config.FlowJobName))
if err = q.consolidatePartitions(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -515,7 +516,7 @@ func QRepFlowWorkflow(
var signalVal shared.CDCFlowSignal

for q.activeSignal == shared.PauseSignal {
q.logger.Info("mirror has been paused for ", time.Since(startTime))
q.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
Expand Down
3 changes: 2 additions & 1 deletion flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peerflow

import (
"fmt"
"log/slog"
"time"

"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (s *SyncFlowExecution) executeSyncFlow(
opts *protos.SyncFlowOptions,
relationMessageMapping model.RelationMessageMapping,
) (*model.SyncResponse, error) {
s.logger.Info("executing sync flow - ", s.CDCFlowName)
s.logger.Info("executing sync flow - ", slog.String("flowName", s.CDCFlowName))

syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
Expand Down
3 changes: 2 additions & 1 deletion flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peerflow

import (
"fmt"
"log/slog"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -100,7 +101,7 @@ func XminFlowWorkflow(
var signalVal shared.CDCFlowSignal

for q.activeSignal == shared.PauseSignal {
q.logger.Info("mirror has been paused for ", time.Since(startTime))
q.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
Expand Down
4 changes: 2 additions & 2 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl FlowGrpcClient {
create_catalog_entry: false,
};
let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?;
let workflow_id = response.into_inner().worflow_id;
let workflow_id = response.into_inner().workflow_id;
Ok(workflow_id)
}

Expand Down Expand Up @@ -82,7 +82,7 @@ impl FlowGrpcClient {
create_catalog_entry: false,
};
let response = self.client.create_cdc_flow(create_peer_flow_req).await?;
let workflow_id = response.into_inner().worflow_id;
let workflow_id = response.into_inner().workflow_id;
Ok(workflow_id)
}

Expand Down
13 changes: 6 additions & 7 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ message FlowConnectionConfigs {
peerdb_peers.Peer destination = 3;

// config for the CDC flow itself
// currently, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals
// currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals
repeated TableMapping table_mappings = 4;
uint32 max_batch_size = 5;
uint64 idle_timeout_seconds = 6;
Expand Down Expand Up @@ -103,6 +103,7 @@ message SyncFlowOptions {
uint64 idle_timeout_seconds = 3;
map<uint32, string> src_table_id_name_mapping = 4;
map<string, TableSchema> table_name_schema_mapping = 5;
repeated TableMapping table_mappings = 6;
}

message NormalizeFlowOptions {
Expand Down Expand Up @@ -363,22 +364,20 @@ message GetOpenConnectionsForUserResult {
// STATUS_RUNNING -> STATUS_PAUSED/STATUS_TERMINATED
// STATUS_PAUSED -> STATUS_RUNNING/STATUS_TERMINATED
// UI can read everything except STATUS_UNKNOWN
// terminate button should always be enabled
enum FlowStatus {
// should never be read by UI, bail
STATUS_UNKNOWN = 0;
// enable pause and terminate buttons
STATUS_RUNNING = 1;
// pause button becomes resume button, terminate button should still be enabled
// pause button becomes resume button
STATUS_PAUSED = 2;
// neither button should be enabled
STATUS_PAUSING = 3;
// neither button should be enabled, not reachable in QRep mirrors
// not reachable in QRep mirrors
STATUS_SETUP = 4;
// neither button should be enabled, not reachable in QRep mirrors
// not reachable in QRep mirrors
STATUS_SNAPSHOT = 5;
// neither button should be enabled
STATUS_TERMINATING = 6;
// neither button should be enabled
STATUS_TERMINATED = 7;
}

Expand Down
4 changes: 2 additions & 2 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ message CreateCDCFlowRequest {
}

message CreateCDCFlowResponse {
string worflow_id = 1;
string workflow_id = 1;
}

message CreateQRepFlowRequest {
Expand All @@ -23,7 +23,7 @@ message CreateQRepFlowRequest {
}

message CreateQRepFlowResponse {
string worflow_id = 1;
string workflow_id = 1;
}

message ShutdownRequest {
Expand Down
Loading

0 comments on commit 43a366b

Please sign in to comment.