From 83e505c0992ec73bf32b2df2a02019893c3034ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 18 May 2024 21:25:27 +0000 Subject: [PATCH] copy any not bytes kinda silly decoding values only to reencode, but getting around would mean lifting pgx code to send copy ourselves --- flow/activities/flowable.go | 4 ++-- .../connectors/postgres/qrep_query_executor.go | 9 ++------- flow/model/pgrecord_copy_from_source.go | 18 +++--------------- flow/model/qrecord_copy_from_source.go | 2 +- flow/model/qrecord_stream.go | 2 +- 5 files changed, 9 insertions(+), 26 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 4d856ffd3e..cf41a88643 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -453,7 +453,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, connectors.QRepSyncConnector.SyncQRepRecords, ) case protos.TypeSystem_PG: - stream := model.NewRecordStream[[]byte](shared.FetchAndChannelSize) + stream := model.NewRecordStream[any](shared.FetchAndChannelSize) err = replicateQRepPartition(ctx, a, config, i+1, numPartitions, p, runUUID, stream, stream, connectors.QRepPullPgConnector.PullPgQRepRecords, @@ -776,7 +776,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, connectors.QRepSyncConnector.SyncQRepRecords) case protos.TypeSystem_PG: return replicateXminPartition(ctx, a, config, partition, runUUID, - model.NewRecordStream[[]byte](shared.FetchAndChannelSize), + model.NewRecordStream[any](shared.FetchAndChannelSize), (*connpostgres.PostgresConnector).PullXminPgRecordStream, connectors.QRepSyncPgConnector.SyncPgQRepRecords) default: diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 7ded382ea7..b04e421dbf 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "slices" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -432,10 +431,6 @@ func (qe *QRepQueryExecutor) mapRowToQRecord( func (qe *QRepQueryExecutor) mapRowToPgRecord( row pgx.Rows, _ []pgconn.FieldDescription, -) ([][]byte, error) { - raw := row.RawValues() - for i, val := range raw { - raw[i] = slices.Clone(val) - } - return slices.Clone(raw), nil +) ([]any, error) { + return row.Values() } diff --git a/flow/model/pgrecord_copy_from_source.go b/flow/model/pgrecord_copy_from_source.go index 331667ff2d..d53ee3cd02 100644 --- a/flow/model/pgrecord_copy_from_source.go +++ b/flow/model/pgrecord_copy_from_source.go @@ -1,12 +1,8 @@ package model -import ( - "github.com/PeerDB-io/peer-flow/shared" -) - type PgRecordCopyFromSource struct { stream *PgRecordStream - currentRecord [][]byte + currentRecord []any } func NewPgRecordCopyFromSource( @@ -21,19 +17,11 @@ func NewPgRecordCopyFromSource( func (src *PgRecordCopyFromSource) Next() bool { rec, ok := <-src.stream.Records src.currentRecord = rec - return ok + return ok || src.Err() != nil } func (src *PgRecordCopyFromSource) Values() ([]interface{}, error) { - if err := src.Err(); err != nil { - return nil, err - } - - values := make([]interface{}, len(src.currentRecord)) - for i, val := range src.currentRecord { - values[i] = shared.UnsafeFastReadOnlyBytesToString(val) - } - return values, nil + return src.currentRecord, src.Err() } func (src *PgRecordCopyFromSource) Err() error { diff --git a/flow/model/qrecord_copy_from_source.go b/flow/model/qrecord_copy_from_source.go index fba2f0f59b..f354549300 100644 --- a/flow/model/qrecord_copy_from_source.go +++ b/flow/model/qrecord_copy_from_source.go @@ -42,7 +42,7 @@ func NewQRecordCopyFromSource( func (src *QRecordCopyFromSource) Next() bool { rec, ok := <-src.stream.Records src.currentRecord = rec - return ok + return ok && src.Err() != nil } func (src *QRecordCopyFromSource) Values() ([]interface{}, error) { diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 4c9563164b..2247126c17 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -22,7 +22,7 @@ type RecordStream[T any] struct { type ( QRecordStream = RecordStream[qvalue.QValue] - PgRecordStream = RecordStream[[]byte] + PgRecordStream = RecordStream[any] ) type RecordsToStreamRequest[T Items] struct {