From ffbae32ec25935bca3dd1960d80ce1478d81f4ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 27 Dec 2023 22:00:51 +0000 Subject: [PATCH] Use more values by value Channels & maps are internally pointers, so behave as if passed by reference already --- flow/activities/flowable.go | 2 +- flow/activities/snapshot_activity.go | 8 ++-- flow/connectors/postgres/cdc.go | 2 +- flow/connectors/postgres/client.go | 52 +++++++++++-------------- flow/connectors/postgres/postgres.go | 2 +- flow/connectors/postgres/slot_signal.go | 8 ++-- flow/model/model.go | 6 +-- flow/workflows/cdc_flow.go | 6 +-- 8 files changed, 40 insertions(+), 46 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 66a4a2033f..5e82c25bf5 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -36,7 +36,7 @@ type CheckConnectionResult struct { } type SlotSnapshotSignal struct { - signal *connpostgres.SlotSignal + signal connpostgres.SlotSignal snapshotName string connector connectors.CDCPullConnector } diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 48e86df3c3..13f77bcb8e 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -11,7 +11,7 @@ import ( ) type SnapshotActivity struct { - SnapshotConnections map[string]*SlotSnapshotSignal + SnapshotConnections map[string]SlotSnapshotSignal } // closes the slot signal @@ -60,7 +60,7 @@ func (a *SnapshotActivity) SetupReplication( }() slog.InfoContext(ctx, "waiting for slot to be created...") - var slotInfo *connpostgres.SlotCreationResult + var slotInfo connpostgres.SlotCreationResult select { case slotInfo = <-slotSignal.SlotCreated: slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName)) @@ -73,10 +73,10 @@ func (a *SnapshotActivity) SetupReplication( } if a.SnapshotConnections == nil { - a.SnapshotConnections = make(map[string]*SlotSnapshotSignal) + a.SnapshotConnections = make(map[string]SlotSnapshotSignal) } - a.SnapshotConnections[config.FlowJobName] = &SlotSnapshotSignal{ + a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{ signal: slotSignal, snapshotName: slotInfo.SnapshotName, connector: conn, diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 05b5a910a7..ff2c9f5898 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -204,7 +204,7 @@ func (p *PostgresCDCSource) consumeStream( if cdcRecordsStorage.IsEmpty() { records.SignalAsEmpty() } - records.RelationMessageMapping <- &p.relationMessageMapping + records.RelationMessageMapping <- p.relationMessageMapping p.logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len())) err := cdcRecordsStorage.Close() if err != nil { diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 304e313f4c..c8720aca26 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -205,7 +205,7 @@ func (c *PostgresConnector) tableExists(schemaTable *utils.SchemaTable) (bool, e } // checkSlotAndPublication checks if the replication slot and publication exist. -func (c *PostgresConnector) checkSlotAndPublication(slot string, publication string) (*SlotCheckResult, error) { +func (c *PostgresConnector) checkSlotAndPublication(slot string, publication string) (SlotCheckResult, error) { slotExists := false publicationExists := false @@ -217,7 +217,7 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str if err != nil { // check if the error is a "no rows" error if err != pgx.ErrNoRows { - return nil, fmt.Errorf("error checking for replication slot - %s: %w", slot, err) + return SlotCheckResult{}, fmt.Errorf("error checking for replication slot - %s: %w", slot, err) } } else { slotExists = true @@ -231,13 +231,13 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str if err != nil { // check if the error is a "no rows" error if err != pgx.ErrNoRows { - return nil, fmt.Errorf("error checking for publication - %s: %w", publication, err) + return SlotCheckResult{}, fmt.Errorf("error checking for publication - %s: %w", publication, err) } } else { publicationExists = true } - return &SlotCheckResult{ + return SlotCheckResult{ SlotExists: slotExists, PublicationExists: publicationExists, }, nil @@ -289,8 +289,8 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er // createSlotAndPublication creates the replication slot and publication. func (c *PostgresConnector) createSlotAndPublication( - signal *SlotSignal, - s *SlotCheckResult, + signal SlotSignal, + s SlotCheckResult, slot string, publication string, tableNameMapping map[string]model.NameAndExclude, @@ -346,33 +346,27 @@ func (c *PostgresConnector) createSlotAndPublication( } c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot)) - if signal != nil { - slotDetails := &SlotCreationResult{ - SlotName: res.SlotName, - SnapshotName: res.SnapshotName, - Err: nil, - } - signal.SlotCreated <- slotDetails - c.logger.Info("Waiting for clone to complete") - <-signal.CloneComplete - c.logger.Info("Clone complete") + slotDetails := SlotCreationResult{ + SlotName: res.SlotName, + SnapshotName: res.SnapshotName, + Err: nil, } + signal.SlotCreated <- slotDetails + c.logger.Info("Waiting for clone to complete") + <-signal.CloneComplete + c.logger.Info("Clone complete") } else { c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot)) - if signal != nil { - var e error - if doInitialCopy { - e = errors.New("slot already exists") - } else { - e = nil - } - slotDetails := &SlotCreationResult{ - SlotName: slot, - SnapshotName: "", - Err: e, - } - signal.SlotCreated <- slotDetails + var e error + if doInitialCopy { + e = errors.New("slot already exists") } + slotDetails := SlotCreationResult{ + SlotName: slot, + SnapshotName: "", + Err: e, + } + signal.SlotCreated <- slotDetails } return nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 59a1b835bd..e807aab6e2 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -774,7 +774,7 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch } // SetupReplication sets up replication for the source connector. -func (c *PostgresConnector) SetupReplication(signal *SlotSignal, req *protos.SetupReplicationInput) error { +func (c *PostgresConnector) SetupReplication(signal SlotSignal, req *protos.SetupReplicationInput) error { // ensure that the flowjob name is [a-z0-9_] only reg := regexp.MustCompile(`^[a-z0-9_]+$`) if !reg.MatchString(req.FlowJobName) { diff --git a/flow/connectors/postgres/slot_signal.go b/flow/connectors/postgres/slot_signal.go index 9660575591..1dd9f8d0d7 100644 --- a/flow/connectors/postgres/slot_signal.go +++ b/flow/connectors/postgres/slot_signal.go @@ -10,14 +10,14 @@ type SlotCreationResult struct { // 1. SlotCreated - this can be waited on to ensure that the slot has been created. // 2. CloneComplete - which can be waited on to ensure that the clone has completed. type SlotSignal struct { - SlotCreated chan *SlotCreationResult + SlotCreated chan SlotCreationResult CloneComplete chan struct{} } // NewSlotSignal returns a new SlotSignal. -func NewSlotSignal() *SlotSignal { - return &SlotSignal{ - SlotCreated: make(chan *SlotCreationResult, 1), +func NewSlotSignal() SlotSignal { + return SlotSignal{ + SlotCreated: make(chan SlotCreationResult, 1), CloneComplete: make(chan struct{}, 1), } } diff --git a/flow/model/model.go b/flow/model/model.go index b579ccfb56..fce3246585 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -319,7 +319,7 @@ type CDCRecordStream struct { // Schema changes from the slot SchemaDeltas chan *protos.TableSchemaDelta // Relation message mapping - RelationMessageMapping chan *RelationMessageMapping + RelationMessageMapping chan RelationMessageMapping // Indicates if the last checkpoint has been set. lastCheckpointSet bool // lastCheckPointID is the last ID of the commit that corresponds to this batch. @@ -335,7 +335,7 @@ func NewCDCRecordStream() *CDCRecordStream { // TODO (kaushik): more than 1024 schema deltas can cause problems! SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10), emptySignal: make(chan bool, 1), - RelationMessageMapping: make(chan *RelationMessageMapping, 1), + RelationMessageMapping: make(chan RelationMessageMapping, 1), lastCheckpointSet: false, lastCheckPointID: atomic.Int64{}, } @@ -454,7 +454,7 @@ type SyncResponse struct { // to be carried to parent WorkFlow TableSchemaDeltas []*protos.TableSchemaDelta // to be stored in state for future PullFlows - RelationMessageMapping *RelationMessageMapping + RelationMessageMapping RelationMessageMapping } type NormalizeResponse struct { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index b73f6945fe..cf19147326 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -50,7 +50,7 @@ type CDCFlowWorkflowState struct { NormalizeFlowErrors []string // Global mapping of relation IDs to RelationMessages sent as a part of logical replication. // Needed to support schema changes. - RelationMessageMapping *model.RelationMessageMapping + RelationMessageMapping model.RelationMessageMapping } // returns a new empty PeerFlowState @@ -64,7 +64,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { SyncFlowErrors: nil, NormalizeFlowErrors: nil, // WORKAROUND: empty maps are protobufed into nil maps for reasons beyond me - RelationMessageMapping: &model.RelationMessageMapping{ + RelationMessageMapping: model.RelationMessageMapping{ 0: &protos.RelationMessage{ RelationId: 0, RelationName: "protobuf_workaround", @@ -358,7 +358,7 @@ func CDCFlowWorkflowWithConfig( SearchAttributes: mirrorNameSearch, } syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) - syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping + syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping childSyncFlowFuture := workflow.ExecuteChildWorkflow( syncCtx, SyncFlowWorkflow,