Skip to content

Commit

Permalink
it builds
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 24, 2024
1 parent 4978121 commit b956381
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 19 deletions.
6 changes: 4 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,13 +770,15 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
) (int64, error) {
switch config.System {
case protos.TypeSystem_Q:
stream := model.NewQRecordStream(shared.FetchAndChannelSize)
return replicateXminPartition(ctx, a, config, partition, runUUID,
model.NewQRecordStream(shared.FetchAndChannelSize),
stream, stream,
(*connpostgres.PostgresConnector).PullXminRecordStream,
connectors.QRepSyncConnector.SyncQRepRecords)
case protos.TypeSystem_PG:
pgread, pgwrite := connpostgres.NewPgCopyPipe()
return replicateXminPartition(ctx, a, config, partition, runUUID,
model.NewRecordStream[any](shared.FetchAndChannelSize),
pgwrite, pgread,
(*connpostgres.PostgresConnector).PullXminPgRecordStream,
connectors.QRepSyncPgConnector.SyncPgQRepRecords)
default:
Expand Down
3 changes: 2 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
}

// replicateXminPartition replicates a XminPartition from the source to the destination.
func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConnectorCore](ctx context.Context,
func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConnectorCore](
ctx context.Context,
a *FlowableActivity,
config *protos.QRepConfig,
partition *protos.QRepPartition,
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"strconv"
"strings"
Expand Down Expand Up @@ -317,9 +316,9 @@ func (c *PostgresConnector) PullPgQRepRecords(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
stream *io.PipeWriter,
stream PgCopyWriter,
) (int, error) {
return corePullQRepRecords(c, ctx, config, partition, PgCopyWriter{PipeWriter: stream})
return corePullQRepRecords(c, ctx, config, partition, stream)
}

func corePullQRepRecords(
Expand Down Expand Up @@ -482,6 +481,7 @@ func syncQRepRecords(

numRowsSynced, err = sink.CopyInto(
ctx,
c,
tx,
pgx.Identifier{dstTable.Schema, dstTable.Table},
)
Expand Down Expand Up @@ -523,6 +523,7 @@ func syncQRepRecords(
// Step 2.2: Insert records into the staging table
numRowsSynced, err = sink.CopyInto(
ctx,
c,
tx,
stagingTableIdentifier,
)
Expand Down
29 changes: 16 additions & 13 deletions flow/connectors/postgres/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -21,18 +20,17 @@ type QuerySinkWriter interface {

type QuerySinkReader interface {
GetColumnNames() []string
CopyInto(context.Context, pgx.Tx, pgx.Identifier) (int64, error)
CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error)
}

type RecordStreamSink struct {
*model.QRecordStream
}

type PgCopyShared struct {
schema []pgconn.FieldDescription
schemaSet bool
schemaLatch chan struct{}
err error
schema []string
schemaSet bool
}

type PgCopyWriter struct {
Expand Down Expand Up @@ -124,15 +122,15 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
return totalRecordsFetched, nil
}

func (stream RecordStreamSink) CopyInto(ctx context.Context, tx pgx.Tx, table pgx.Identifier) (int64, error) {
func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream))
}

func (stream RecordStreamSink) GetColumnNames() []string {
return stream.Schema().GetColumnNames()
}

func (p PgCopyWriter) SetSchema(schema []pgconn.FieldDescription) {
func (p PgCopyWriter) SetSchema(schema []string) {
if !p.schema.schemaSet {
p.schema.schema = schema
close(p.schema.schemaLatch)
Expand Down Expand Up @@ -169,20 +167,21 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
if err != nil {
return 0, err
}

fieldDescriptions := norows.FieldDescriptions()
cols := make([]string, 0, len(fieldDescriptions))
for _, fd := range fieldDescriptions {
cols = append(cols, QuoteIdentifier(fd.Name))
}
if !p.IsSchemaSet() {
p.SetSchema(fieldDescriptions)
p.SetSchema(cols)
}
norows.Close()

// TODO use pgx simple query arg parsing code (it's internal, need to copy)
// TODO correctly interpolate
for i, arg := range args {
query = strings.Replace(query, fmt.Sprintf("$%d", i), fmt.Sprint(arg), -1)
query = strings.ReplaceAll(query, fmt.Sprintf("$%d", i), fmt.Sprint(arg))
}

copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ","))
Expand Down Expand Up @@ -215,12 +214,16 @@ func (p PgCopyWriter) Close(err error) {
p.PipeWriter.CloseWithError(err)
}

func (p PgCopyReader) CopyInto(ctx context.Context, qe *QRepQueryExecutor, tx pgx.Tx, table pgx.Identifier) (int64, error) {
func (p PgCopyReader) GetColumnNames() []string {
return p.schema.schema
}

func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
<-p.schema.schemaLatch
cols := make([]string, 0, len(p.schema.schema))
for _, fd := range p.schema.schema {
cols = append(cols, QuoteIdentifier(fd.Name))
for _, col := range p.schema.schema {
cols = append(cols, QuoteIdentifier(col))
}
_, err := qe.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(cols, ",")))
_, err := c.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(cols, ",")))
return 0, err
}

0 comments on commit b956381

Please sign in to comment.