diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5a288d532a..287793c7ab 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -770,13 +770,15 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, ) (int64, error) { switch config.System { case protos.TypeSystem_Q: + stream := model.NewQRecordStream(shared.FetchAndChannelSize) return replicateXminPartition(ctx, a, config, partition, runUUID, - model.NewQRecordStream(shared.FetchAndChannelSize), + stream, stream, (*connpostgres.PostgresConnector).PullXminRecordStream, connectors.QRepSyncConnector.SyncQRepRecords) case protos.TypeSystem_PG: + pgread, pgwrite := connpostgres.NewPgCopyPipe() return replicateXminPartition(ctx, a, config, partition, runUUID, - model.NewRecordStream[any](shared.FetchAndChannelSize), + pgwrite, pgread, (*connpostgres.PostgresConnector).PullXminPgRecordStream, connectors.QRepSyncPgConnector.SyncPgQRepRecords) default: diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 7101c5fa44..1434a01184 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -398,7 +398,8 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn } // replicateXminPartition replicates a XminPartition from the source to the destination. -func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConnectorCore](ctx context.Context, +func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConnectorCore]( + ctx context.Context, a *FlowableActivity, config *protos.QRepConfig, partition *protos.QRepPartition, diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 854b84f1d5..e9b0a5558e 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "io" "log/slog" "strconv" "strings" @@ -317,9 +316,9 @@ func (c *PostgresConnector) PullPgQRepRecords( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, - stream *io.PipeWriter, + stream PgCopyWriter, ) (int, error) { - return corePullQRepRecords(c, ctx, config, partition, PgCopyWriter{PipeWriter: stream}) + return corePullQRepRecords(c, ctx, config, partition, stream) } func corePullQRepRecords( @@ -482,6 +481,7 @@ func syncQRepRecords( numRowsSynced, err = sink.CopyInto( ctx, + c, tx, pgx.Identifier{dstTable.Schema, dstTable.Table}, ) @@ -523,6 +523,7 @@ func syncQRepRecords( // Step 2.2: Insert records into the staging table numRowsSynced, err = sink.CopyInto( ctx, + c, tx, stagingTableIdentifier, ) diff --git a/flow/connectors/postgres/sink.go b/flow/connectors/postgres/sink.go index 9d619737fe..831bc75977 100644 --- a/flow/connectors/postgres/sink.go +++ b/flow/connectors/postgres/sink.go @@ -8,7 +8,6 @@ import ( "strings" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" @@ -21,7 +20,7 @@ type QuerySinkWriter interface { type QuerySinkReader interface { GetColumnNames() []string - CopyInto(context.Context, pgx.Tx, pgx.Identifier) (int64, error) + CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error) } type RecordStreamSink struct { @@ -29,10 +28,9 @@ type RecordStreamSink struct { } type PgCopyShared struct { - schema []pgconn.FieldDescription - schemaSet bool schemaLatch chan struct{} - err error + schema []string + schemaSet bool } type PgCopyWriter struct { @@ -124,7 +122,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( return totalRecordsFetched, nil } -func (stream RecordStreamSink) CopyInto(ctx context.Context, tx pgx.Tx, table pgx.Identifier) (int64, error) { +func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream)) } @@ -132,7 +130,7 @@ func (stream RecordStreamSink) GetColumnNames() []string { return stream.Schema().GetColumnNames() } -func (p PgCopyWriter) SetSchema(schema []pgconn.FieldDescription) { +func (p PgCopyWriter) SetSchema(schema []string) { if !p.schema.schemaSet { p.schema.schema = schema close(p.schema.schemaLatch) @@ -169,20 +167,21 @@ func (p PgCopyWriter) ExecuteQueryWithTx( if err != nil { return 0, err } + fieldDescriptions := norows.FieldDescriptions() cols := make([]string, 0, len(fieldDescriptions)) for _, fd := range fieldDescriptions { cols = append(cols, QuoteIdentifier(fd.Name)) } if !p.IsSchemaSet() { - p.SetSchema(fieldDescriptions) + p.SetSchema(cols) } norows.Close() // TODO use pgx simple query arg parsing code (it's internal, need to copy) // TODO correctly interpolate for i, arg := range args { - query = strings.Replace(query, fmt.Sprintf("$%d", i), fmt.Sprint(arg), -1) + query = strings.ReplaceAll(query, fmt.Sprintf("$%d", i), fmt.Sprint(arg)) } copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ",")) @@ -215,12 +214,16 @@ func (p PgCopyWriter) Close(err error) { p.PipeWriter.CloseWithError(err) } -func (p PgCopyReader) CopyInto(ctx context.Context, qe *QRepQueryExecutor, tx pgx.Tx, table pgx.Identifier) (int64, error) { +func (p PgCopyReader) GetColumnNames() []string { + return p.schema.schema +} + +func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { <-p.schema.schemaLatch cols := make([]string, 0, len(p.schema.schema)) - for _, fd := range p.schema.schema { - cols = append(cols, QuoteIdentifier(fd.Name)) + for _, col := range p.schema.schema { + cols = append(cols, QuoteIdentifier(col)) } - _, err := qe.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(cols, ","))) + _, err := c.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(cols, ","))) return 0, err }