Skip to content

Commit

Permalink
Use more values by value
Browse files Browse the repository at this point in the history
Channels & maps are internally pointers,
so behave as if passed by reference already
  • Loading branch information
serprex committed Dec 27, 2023
1 parent c9e5e90 commit ffbae32
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 46 deletions.
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type CheckConnectionResult struct {
}

type SlotSnapshotSignal struct {
signal *connpostgres.SlotSignal
signal connpostgres.SlotSignal
snapshotName string
connector connectors.CDCPullConnector
}
Expand Down
8 changes: 4 additions & 4 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type SnapshotActivity struct {
SnapshotConnections map[string]*SlotSnapshotSignal
SnapshotConnections map[string]SlotSnapshotSignal
}

// closes the slot signal
Expand Down Expand Up @@ -60,7 +60,7 @@ func (a *SnapshotActivity) SetupReplication(
}()

slog.InfoContext(ctx, "waiting for slot to be created...")
var slotInfo *connpostgres.SlotCreationResult
var slotInfo connpostgres.SlotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName))
Expand All @@ -73,10 +73,10 @@ func (a *SnapshotActivity) SetupReplication(
}

if a.SnapshotConnections == nil {
a.SnapshotConnections = make(map[string]*SlotSnapshotSignal)
a.SnapshotConnections = make(map[string]SlotSnapshotSignal)
}

a.SnapshotConnections[config.FlowJobName] = &SlotSnapshotSignal{
a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{
signal: slotSignal,
snapshotName: slotInfo.SnapshotName,
connector: conn,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (p *PostgresCDCSource) consumeStream(
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
}
records.RelationMessageMapping <- &p.relationMessageMapping
records.RelationMessageMapping <- p.relationMessageMapping
p.logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len()))
err := cdcRecordsStorage.Close()
if err != nil {
Expand Down
52 changes: 23 additions & 29 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *PostgresConnector) tableExists(schemaTable *utils.SchemaTable) (bool, e
}

// checkSlotAndPublication checks if the replication slot and publication exist.
func (c *PostgresConnector) checkSlotAndPublication(slot string, publication string) (*SlotCheckResult, error) {
func (c *PostgresConnector) checkSlotAndPublication(slot string, publication string) (SlotCheckResult, error) {
slotExists := false
publicationExists := false

Expand All @@ -217,7 +217,7 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str
if err != nil {
// check if the error is a "no rows" error
if err != pgx.ErrNoRows {
return nil, fmt.Errorf("error checking for replication slot - %s: %w", slot, err)
return SlotCheckResult{}, fmt.Errorf("error checking for replication slot - %s: %w", slot, err)
}
} else {
slotExists = true
Expand All @@ -231,13 +231,13 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str
if err != nil {
// check if the error is a "no rows" error
if err != pgx.ErrNoRows {
return nil, fmt.Errorf("error checking for publication - %s: %w", publication, err)
return SlotCheckResult{}, fmt.Errorf("error checking for publication - %s: %w", publication, err)
}
} else {
publicationExists = true
}

return &SlotCheckResult{
return SlotCheckResult{
SlotExists: slotExists,
PublicationExists: publicationExists,
}, nil
Expand Down Expand Up @@ -289,8 +289,8 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er

// createSlotAndPublication creates the replication slot and publication.
func (c *PostgresConnector) createSlotAndPublication(
signal *SlotSignal,
s *SlotCheckResult,
signal SlotSignal,
s SlotCheckResult,
slot string,
publication string,
tableNameMapping map[string]model.NameAndExclude,
Expand Down Expand Up @@ -346,33 +346,27 @@ func (c *PostgresConnector) createSlotAndPublication(
}

c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot))
if signal != nil {
slotDetails := &SlotCreationResult{
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
<-signal.CloneComplete
c.logger.Info("Clone complete")
slotDetails := SlotCreationResult{
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
<-signal.CloneComplete
c.logger.Info("Clone complete")
} else {
c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot))
if signal != nil {
var e error
if doInitialCopy {
e = errors.New("slot already exists")
} else {
e = nil
}
slotDetails := &SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: e,
}
signal.SlotCreated <- slotDetails
var e error
if doInitialCopy {
e = errors.New("slot already exists")
}
slotDetails := SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: e,
}
signal.SlotCreated <- slotDetails
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch
}

// SetupReplication sets up replication for the source connector.
func (c *PostgresConnector) SetupReplication(signal *SlotSignal, req *protos.SetupReplicationInput) error {
func (c *PostgresConnector) SetupReplication(signal SlotSignal, req *protos.SetupReplicationInput) error {
// ensure that the flowjob name is [a-z0-9_] only
reg := regexp.MustCompile(`^[a-z0-9_]+$`)
if !reg.MatchString(req.FlowJobName) {
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/postgres/slot_signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ type SlotCreationResult struct {
// 1. SlotCreated - this can be waited on to ensure that the slot has been created.
// 2. CloneComplete - which can be waited on to ensure that the clone has completed.
type SlotSignal struct {
SlotCreated chan *SlotCreationResult
SlotCreated chan SlotCreationResult
CloneComplete chan struct{}
}

// NewSlotSignal returns a new SlotSignal.
func NewSlotSignal() *SlotSignal {
return &SlotSignal{
SlotCreated: make(chan *SlotCreationResult, 1),
func NewSlotSignal() SlotSignal {
return SlotSignal{
SlotCreated: make(chan SlotCreationResult, 1),
CloneComplete: make(chan struct{}, 1),
}
}
6 changes: 3 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ type CDCRecordStream struct {
// Schema changes from the slot
SchemaDeltas chan *protos.TableSchemaDelta
// Relation message mapping
RelationMessageMapping chan *RelationMessageMapping
RelationMessageMapping chan RelationMessageMapping
// Indicates if the last checkpoint has been set.
lastCheckpointSet bool
// lastCheckPointID is the last ID of the commit that corresponds to this batch.
Expand All @@ -335,7 +335,7 @@ func NewCDCRecordStream() *CDCRecordStream {
// TODO (kaushik): more than 1024 schema deltas can cause problems!
SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10),
emptySignal: make(chan bool, 1),
RelationMessageMapping: make(chan *RelationMessageMapping, 1),
RelationMessageMapping: make(chan RelationMessageMapping, 1),
lastCheckpointSet: false,
lastCheckPointID: atomic.Int64{},
}
Expand Down Expand Up @@ -454,7 +454,7 @@ type SyncResponse struct {
// to be carried to parent WorkFlow
TableSchemaDeltas []*protos.TableSchemaDelta
// to be stored in state for future PullFlows
RelationMessageMapping *RelationMessageMapping
RelationMessageMapping RelationMessageMapping
}

type NormalizeResponse struct {
Expand Down
6 changes: 3 additions & 3 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type CDCFlowWorkflowState struct {
NormalizeFlowErrors []string
// Global mapping of relation IDs to RelationMessages sent as a part of logical replication.
// Needed to support schema changes.
RelationMessageMapping *model.RelationMessageMapping
RelationMessageMapping model.RelationMessageMapping
}

// returns a new empty PeerFlowState
Expand All @@ -64,7 +64,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
// WORKAROUND: empty maps are protobufed into nil maps for reasons beyond me
RelationMessageMapping: &model.RelationMessageMapping{
RelationMessageMapping: model.RelationMessageMapping{
0: &protos.RelationMessage{
RelationId: 0,
RelationName: "protobuf_workaround",
Expand Down Expand Up @@ -358,7 +358,7 @@ func CDCFlowWorkflowWithConfig(
SearchAttributes: mirrorNameSearch,
}
syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping
syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
childSyncFlowFuture := workflow.ExecuteChildWorkflow(
syncCtx,
SyncFlowWorkflow,
Expand Down

0 comments on commit ffbae32

Please sign in to comment.