Skip to content

Commit

Permalink
removed debug logging
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Sep 25, 2023
1 parent 4a2cf37 commit 29b580b
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 11 deletions.
4 changes: 4 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,5 +721,9 @@ func (a *FlowableActivity) CreateAdditionalTable(
TableIdentifierMapping[srcTableIdentifier].GetPostgresTableIdentifier().RelId
input.AdditionalTableInfo.TableSchema = tableNameSchemaMapping.TableNameSchemaMapping[srcTableIdentifier]

log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("finished creating additional table %s\n", dstTableIdentifier)

return input.AdditionalTableInfo, nil
}
2 changes: 0 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ func (p *postgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
log.Infof("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)
if p.relationMessageMapping[msg.RelationID] == nil {
fmt.Println("tableNameMapping: ", p.tableNameMapping)
fmt.Println("srcTableIDNameMapping: ", p.srcTableIDNameMapping)
_, ok := p.tableNameMapping[fmt.Sprintf("%s.%s", msg.Namespace, msg.RelationName)]
// either it's a table we are aware of from SetupFlow, so it is in the map
// or it is a table we are not aware of, in a schema we are aware of.
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ func (c *PostgresConnector) createSlotAndPublication(

stmt = fmt.Sprintf("CREATE PUBLICATION %s FOR TABLE %s", publication, tableNameString)
}
fmt.Println("PUBLICATION stmt: ", stmt)
_, err := c.pool.Exec(c.ctx, stmt)
if err != nil {
log.Warnf("Error creating publication '%s': %v", publication, err)
Expand Down
4 changes: 0 additions & 4 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,6 @@ func CDCFlowWorkflowWithConfig(
cfg.SrcTableIdNameMapping[childSyncFlowRes.AdditionalTableInfo.RelId] = srcTableIdentifier
// because
cfg.TableNameSchemaMapping[dstTableIdentifier] = childSyncFlowRes.AdditionalTableInfo.TableSchema

fmt.Println("cfg tableNameMapping: ", cfg.TableNameMapping)
fmt.Println("cfg SrcTableIdNameMapping: ", cfg.SrcTableIdNameMapping)
fmt.Println("cfg TableNameSchemaMapping: ", cfg.TableNameSchemaMapping)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ func (s *SetupFlowExecution) ensurePullability(
})
tmpMap := make(map[uint32]string)

srcIdentifiers := maps.Keys(config.TableNameMapping)
slices.Sort(srcIdentifiers)
srcTblIdentifiers := maps.Keys(config.TableNameMapping)
slices.Sort(srcTblIdentifiers)

// create EnsurePullabilityInput for the srcTableName
ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{
PeerConnectionConfig: config.Source,
FlowJobName: s.CDCFlowName,
SourceTableIdentifiers: srcIdentifiers,
SourceTableIdentifiers: srcTblIdentifiers,
}

future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput)
Expand Down
2 changes: 1 addition & 1 deletion nexus/sqlparser-rs
Submodule sqlparser-rs updated 1 files
+7 −6 src/parser.rs

0 comments on commit 29b580b

Please sign in to comment.