Skip to content

Commit

Permalink
fixed more review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 17, 2024
1 parent 769870f commit 7043638
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 100 deletions.
25 changes: 21 additions & 4 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (c *PostgresConnector) EnsurePullability(
}

if !req.CheckConstraints {
msg := fmt.Sprintf("[no-constriants] ensured pullability table %s", tableName)
msg := fmt.Sprintf("[no-constraints] ensured pullability table %s", tableName)
utils.RecordHeartbeatWithRecover(c.ctx, msg)
continue
}
Expand All @@ -809,7 +809,7 @@ func (c *PostgresConnector) EnsurePullability(
}

// we only allow no primary key if the table has REPLICA IDENTITY FULL
// this is ok for replica identity idex as we populate the primary key columns
// this is ok for replica identity index as we populate the primary key columns
if len(pKeyCols) == 0 && !(replicaIdentity == ReplicaIdentityFull) {
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}
Expand Down Expand Up @@ -947,16 +947,33 @@ func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnecti

func (c *PostgresConnector) AddTablesToPublication(req *protos.AddTablesToPublicationInput) error {
// don't modify custom publications
if req == nil || req.PublicationName != "" || len(req.AdditionalTables) == 0 {
if req == nil || len(req.AdditionalTables) == 0 {
return nil
}

additionalSrcTables := make([]string, 0, len(req.AdditionalTables))
for _, additionalTableMapping := range req.AdditionalTables {
additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier)
}
additionalSrcTablesString := strings.Join(additionalSrcTables, ",")

// just check if we have all the tables already in the publication
if req.PublicationName != "" {
rows, err := c.pool.Query(c.ctx,
"SELECT tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName)
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}

tableNames, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}
if len(utils.ArrayMinus(tableNames, additionalSrcTables)) > 0 {
return fmt.Errorf("some additional tables not present in custom publication")
}
}

additionalSrcTablesString := strings.Join(additionalSrcTables, ",")
_, err := c.pool.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s",
c.getDefaultPublicationName(req.FlowJobName), additionalSrcTablesString))
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package utils

// first - second
func ArrayMinus[T comparable](first, second []T) []T {
lookup := make(map[T]struct{}, len(second))
// Add elements from arrayB to the lookup map
Expand Down Expand Up @@ -33,11 +34,11 @@ func ArrayChunks[T any](slice []T, size int) [][]T {
func ArraysHaveOverlap[T comparable](first, second []T) bool {
lookup := make(map[T]struct{})

for _, element := range first {
for _, element := range second {
lookup[element] = struct{}{}
}

for _, element := range second {
for _, element := range first {
if _, exists := lookup[element]; exists {
return true
}
Expand Down
36 changes: 5 additions & 31 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -55,10 +54,6 @@ type CDCFlowWorkflowState struct {
TableNameSchemaMapping map[string]*protos.TableSchema
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// maintaining a copy of SrcTableIdNameMapping and TableNameSchemaMapping from protos.FlowConnectionConfigs
// ideally it shouldn't even be in the config, since it is set dynamically in SetupFlow and should be only in state
SrcTableIdNameMapping map[uint32]string
TableNameSchemaMapping map[string]*protos.TableSchema
}

type SignalProps struct {
Expand Down Expand Up @@ -158,27 +153,6 @@ func (w *CDCFlowWorkflowExecution) receiveAndHandleSignalAsync(ctx workflow.Cont
}
}

func additionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,
additionalTableMappings []*protos.TableMapping,
) bool {
currentSrcTables := make([]string, 0, len(currentTableMappings))
currentDstTables := make([]string, 0, len(currentTableMappings))
additionalSrcTables := make([]string, 0, len(additionalTableMappings))
additionalDstTables := make([]string, 0, len(additionalTableMappings))

for _, currentTableMapping := range currentTableMappings {
currentSrcTables = append(currentSrcTables, currentTableMapping.SourceTableIdentifier)
currentDstTables = append(currentDstTables, currentTableMapping.DestinationTableIdentifier)
}
for _, additionalTableMapping := range additionalTableMappings {
currentSrcTables = append(currentSrcTables, additionalTableMapping.SourceTableIdentifier)
currentDstTables = append(currentDstTables, additionalTableMapping.DestinationTableIdentifier)
}

return utils.ArraysHaveOverlap[string](currentSrcTables, additionalSrcTables) ||
utils.ArraysHaveOverlap[string](currentDstTables, additionalDstTables)
}

func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context,
cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState,
limits *CDCFlowLimits, mirrorNameSearch *map[string]interface{},
Expand All @@ -187,7 +161,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
if len(flowConfigUpdate.AdditionalTables) == 0 {
continue
}
if additionalTablesHasOverlap(cfg.TableMappings, flowConfigUpdate.AdditionalTables) {
if shared.AdditionalTablesHasOverlap(cfg.TableMappings, flowConfigUpdate.AdditionalTables) {
return fmt.Errorf("duplicate source/destination tables found in additionalTables")
}

Expand All @@ -204,8 +178,8 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
}

additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
additionalTablesWorkflowCfg.DoInitialCopy = true
additionalTablesWorkflowCfg.InitialCopyOnly = true
additionalTablesWorkflowCfg.DoInitialSnapshot = true
additionalTablesWorkflowCfg.InitialSnapshotOnly = true
additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables
additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName,
strings.ToLower(shared.RandomString(8)))
Expand Down Expand Up @@ -240,10 +214,10 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
}

for tableID, tableName := range res.SrcTableIdNameMapping {
cfg.SrcTableIdNameMapping[tableID] = tableName
state.SrcTableIdNameMapping[tableID] = tableName
}
for tableName, tableSchema := range res.TableNameSchemaMapping {
cfg.TableNameSchemaMapping[tableName] = tableSchema
state.TableNameSchemaMapping[tableName] = tableSchema
}
cfg.TableMappings = append(cfg.TableMappings, flowConfigUpdate.AdditionalTables...)
// finished processing, wipe it
Expand Down
2 changes: 1 addition & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl FlowGrpcClient {
requested_flow_state: state.into(),
source_peer: Some(workflow_details.source_peer),
destination_peer: Some(workflow_details.destination_peer),
flow_config_update
flow_config_update,
};
let response = self.client.flow_state_change(state_change_req).await?;
let state_change_response = response.into_inner();
Expand Down
84 changes: 22 additions & 62 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ impl AuthSource for FixedPasswordAuthSource {
// randomly generate a 4 byte salt
let salt = rand::thread_rng().gen::<[u8; 4]>();
let password = &self.password;
let hash_password = hash_md5_password(
login_info.user().unwrap_or(""),
password,
&salt,
);
let hash_password = hash_md5_password(login_info.user().unwrap_or(""), password, &salt);
Ok(Password::new(
Some(salt.to_vec()),
hash_password.as_bytes().to_vec(),
Expand Down Expand Up @@ -116,9 +112,9 @@ impl NexusBackend {
) -> PgWireResult<Vec<Response<'a>>> {
let res = executor.execute(stmt).await?;
match res {
QueryOutput::AffectedRows(rows) => Ok(vec![Response::Execution(
Tag::new("OK").with_rows(rows),
)]),
QueryOutput::AffectedRows(rows) => {
Ok(vec![Response::Execution(Tag::new("OK").with_rows(rows))])
}
QueryOutput::Stream(rows) => {
let schema = rows.schema();
let res = sendable_stream_to_query_response(schema, rows)?;
Expand All @@ -134,17 +130,13 @@ impl NexusBackend {
match cm {
peer_cursor::CursorModification::Created(cursor_name) => {
peer_cursors.add_cursor(cursor_name, peer_holder.unwrap());
Ok(vec![Response::Execution(Tag::new(
"DECLARE CURSOR",
))])
Ok(vec![Response::Execution(Tag::new("DECLARE CURSOR"))])
}
peer_cursor::CursorModification::Closed(cursors) => {
for cursor_name in cursors {
peer_cursors.remove_cursor(&cursor_name);
}
Ok(vec![Response::Execution(Tag::new(
"CLOSE CURSOR",
))])
Ok(vec![Response::Execution(Tag::new("CLOSE CURSOR"))])
}
}
}
Expand Down Expand Up @@ -187,9 +179,7 @@ impl NexusBackend {
) -> PgWireResult<Vec<Response<'static>>> {
if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Ok(vec![Response::Execution(Tag::new(
existing_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(existing_mirror_success))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand Down Expand Up @@ -289,14 +279,10 @@ impl NexusBackend {
)
})?;
let drop_mirror_success = format!("DROP MIRROR {}", flow_job_name);
Ok(vec![Response::Execution(Tag::new(
&drop_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(&drop_mirror_success))])
} else if *if_exists {
let no_mirror_success = "NO SUCH MIRROR";
Ok(vec![Response::Execution(Tag::new(
no_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(no_mirror_success))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand Down Expand Up @@ -348,16 +334,12 @@ impl NexusBackend {
if qrep_flow_job.disabled {
let create_mirror_success =
format!("CREATE MIRROR {}", qrep_flow_job.name);
return Ok(vec![Response::Execution(Tag::new(
&create_mirror_success,
))]);
return Ok(vec![Response::Execution(Tag::new(&create_mirror_success))]);
}

let _workflow_id = self.run_qrep_mirror(qrep_flow_job).await?;
let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name);
Ok(vec![Response::Execution(Tag::new(
&create_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(&create_mirror_success))])
} else {
Self::handle_mirror_existence(*if_not_exists, &qrep_flow_job.name)
}
Expand Down Expand Up @@ -397,9 +379,7 @@ impl NexusBackend {
e.to_string(),
)))
})?;
Ok(vec![Response::Execution(Tag::new(
"OK",
))])
Ok(vec![Response::Execution(Tag::new("OK"))])
}
PeerDDL::CreateMirrorForCDC {
if_not_exists,
Expand Down Expand Up @@ -478,9 +458,7 @@ impl NexusBackend {
})?;

let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name);
Ok(vec![Response::Execution(Tag::new(
&create_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(&create_mirror_success))])
} else {
Self::handle_mirror_existence(*if_not_exists, &flow_job.name)
}
Expand All @@ -507,9 +485,7 @@ impl NexusBackend {
} {
let workflow_id = self.run_qrep_mirror(&job).await?;
let create_mirror_success = format!("STARTED WORKFLOW {}", workflow_id);
Ok(vec![Response::Execution(Tag::new(
&create_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(&create_mirror_success))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand Down Expand Up @@ -551,14 +527,10 @@ impl NexusBackend {
PgWireError::ApiError(format!("unable to drop peer: {:?}", err).into())
})?;
let drop_peer_success = format!("DROP PEER {}", peer_name);
Ok(vec![Response::Execution(Tag::new(
&drop_peer_success,
))])
Ok(vec![Response::Execution(Tag::new(&drop_peer_success))])
} else if *if_exists {
let no_peer_success = "NO SUCH PEER";
Ok(vec![Response::Execution(Tag::new(
no_peer_success,
))])
Ok(vec![Response::Execution(Tag::new(no_peer_success))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand Down Expand Up @@ -639,15 +611,11 @@ impl NexusBackend {
})?;

let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name);
Ok(vec![Response::Execution(Tag::new(
&resync_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(&resync_mirror_success))])
}
None => {
let no_peer_success = "NO SUCH QREP MIRROR";
Ok(vec![Response::Execution(Tag::new(
no_peer_success,
))])
Ok(vec![Response::Execution(Tag::new(no_peer_success))])
}
}
}
Expand Down Expand Up @@ -697,14 +665,10 @@ impl NexusBackend {
)
})?;
let drop_mirror_success = format!("PAUSE MIRROR {}", flow_job_name);
Ok(vec![Response::Execution(Tag::new(
&drop_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(&drop_mirror_success))])
} else if *if_exists {
let no_mirror_success = "NO SUCH MIRROR";
Ok(vec![Response::Execution(Tag::new(
no_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(no_mirror_success))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand Down Expand Up @@ -759,14 +723,10 @@ impl NexusBackend {
)
})?;
let resume_mirror_success = format!("RESUME MIRROR {}", flow_job_name);
Ok(vec![Response::Execution(Tag::new(
&resume_mirror_success
))])
Ok(vec![Response::Execution(Tag::new(&resume_mirror_success))])
} else if *if_exists {
let no_mirror_success = "NO SUCH MIRROR";
Ok(vec![Response::Execution(Tag::new(
no_mirror_success,
))])
Ok(vec![Response::Execution(Tag::new(no_mirror_success))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand Down

0 comments on commit 7043638

Please sign in to comment.