Skip to content

Commit

Permalink
Add support for more properties to be passed with TableMapping (#500)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Oct 11, 2023
1 parent 1213a83 commit 1fa5903
Show file tree
Hide file tree
Showing 17 changed files with 1,210 additions and 954 deletions.
7 changes: 6 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("pulling records...")

tblNameMapping := make(map[string]string)
for _, v := range input.FlowConnectionConfigs.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
}

startTime := time.Now()
recordsWithTableSchemaDelta, err := srcConn.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: input.FlowConnectionConfigs.TableNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: 10 * time.Second,
Expand Down
9 changes: 4 additions & 5 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,17 @@ func (h *FlowRequestHandler) createFlowJobEntry(ctx context.Context,
req.ConnectionConfigs.Destination.Name, srcErr)
}

for sourceTableIdentifier := range req.ConnectionConfigs.TableNameMapping {
destinationTableIdentifier := req.ConnectionConfigs.TableNameMapping[sourceTableIdentifier]
for _, v := range req.ConnectionConfigs.TableMappings {
_, err := h.pool.Exec(ctx, `
INSERT INTO flows (workflow_id, name, source_peer, destination_peer, description,
source_table_identifier, destination_table_identifier) VALUES ($1, $2, $3, $4, $5, $6, $7)
`, workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID,
"Mirror created via GRPC",
schemaForTableIdentifier(sourceTableIdentifier, sourePeerType),
schemaForTableIdentifier(destinationTableIdentifier, destinationPeerType))
schemaForTableIdentifier(v.SourceTableIdentifier, sourePeerType),
schemaForTableIdentifier(v.DestinationTableIdentifier, destinationPeerType))
if err != nil {
return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w",
req.ConnectionConfigs.FlowJobName, sourceTableIdentifier, err)
req.ConnectionConfigs.FlowJobName, v.SourceTableIdentifier, err)
}
}

Expand Down
10 changes: 9 additions & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,17 @@ func GenerateSnowflakePeer(snowflakeConfig *protos.SnowflakeConfig) (*protos.Pee
}

func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*protos.FlowConnectionConfigs, error) {
tblMappings := []*protos.TableMapping{}
for k, v := range c.TableNameMapping {
tblMappings = append(tblMappings, &protos.TableMapping{
SourceTableIdentifier: k,
DestinationTableIdentifier: v,
})
}

ret := &protos.FlowConnectionConfigs{}
ret.FlowJobName = c.FlowJobName
ret.TableNameMapping = c.TableNameMapping
ret.TableMappings = tblMappings
ret.Source = GeneratePostgresPeer(c.PostgresPort)
ret.Destination = c.Destination
ret.CdcSyncMode = c.CDCSyncMode
Expand Down
Loading

0 comments on commit 1fa5903

Please sign in to comment.