Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into spiritus-mundi
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 14, 2024
2 parents c0d46b6 + aa095a5 commit d217029
Show file tree
Hide file tree
Showing 42 changed files with 2,261 additions and 1,800 deletions.
57 changes: 51 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -157,25 +158,69 @@ func (a *FlowableActivity) GetTableSchema(
return srcConn.GetTableSchema(ctx, config)
}

// CreateNormalizedTable creates a normalized table in the destination flowable.
// CreateNormalizedTable creates normalized tables in destination.
func (a *FlowableActivity) CreateNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
logger := activity.GetLogger(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
conn, err := connectors.GetConnectorAs[connectors.NormalizedTablesConnector](ctx, config.PeerConnectionConfig)
if err != nil {
if err == connectors.ErrUnsupportedFunctionality {
logger.Info("Connector does not implement normalized tables")
return nil, nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, conn)

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(ctx, config)
tx, err := conn.StartSetupNormalizedTables(ctx)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
return nil, fmt.Errorf("failed to setup normalized tables tx: %w", err)
}
defer conn.CleanupSetupNormalizedTables(ctx, tx)

numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(config.TableNameSchemaMapping))
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
defer shutdown()

return setupNormalizedTablesOutput, nil
tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range config.TableNameSchemaMapping {
created, err := conn.SetupNormalizedTable(
ctx,
tx,
tableIdentifier,
tableSchema,
config.SoftDeleteColName,
config.SyncedAtColName,
)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to setup normalized table %s: %w", tableIdentifier, err)
}
tableExistsMapping[tableIdentifier] = created

numTablesSetup.Add(1)
if created {
logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
} else {
logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
}
}

err = conn.FinishSetupNormalizedTables(ctx, tx)
if err != nil {
return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err)
}

return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: tableExistsMapping,
}, nil
}

func (a *FlowableActivity) MaintainPull(
Expand Down
Loading

0 comments on commit d217029

Please sign in to comment.