Skip to content

Commit

Permalink
copy any not bytes
Browse files Browse the repository at this point in the history
kinda silly decoding values only to reencode, but getting around would mean lifting pgx code to send copy ourselves
  • Loading branch information
serprex committed May 21, 2024
1 parent 66d2306 commit 83e505c
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 26 deletions.
4 changes: 2 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
connectors.QRepSyncConnector.SyncQRepRecords,
)
case protos.TypeSystem_PG:
stream := model.NewRecordStream[[]byte](shared.FetchAndChannelSize)
stream := model.NewRecordStream[any](shared.FetchAndChannelSize)
err = replicateQRepPartition(ctx, a, config, i+1, numPartitions, p, runUUID,
stream, stream,
connectors.QRepPullPgConnector.PullPgQRepRecords,
Expand Down Expand Up @@ -776,7 +776,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
connectors.QRepSyncConnector.SyncQRepRecords)
case protos.TypeSystem_PG:
return replicateXminPartition(ctx, a, config, partition, runUUID,
model.NewRecordStream[[]byte](shared.FetchAndChannelSize),
model.NewRecordStream[any](shared.FetchAndChannelSize),
(*connpostgres.PostgresConnector).PullXminPgRecordStream,
connectors.QRepSyncPgConnector.SyncPgQRepRecords)
default:
Expand Down
9 changes: 2 additions & 7 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"slices"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand Down Expand Up @@ -432,10 +431,6 @@ func (qe *QRepQueryExecutor) mapRowToQRecord(
func (qe *QRepQueryExecutor) mapRowToPgRecord(
row pgx.Rows,
_ []pgconn.FieldDescription,
) ([][]byte, error) {
raw := row.RawValues()
for i, val := range raw {
raw[i] = slices.Clone(val)
}
return slices.Clone(raw), nil
) ([]any, error) {
return row.Values()
}
18 changes: 3 additions & 15 deletions flow/model/pgrecord_copy_from_source.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package model

import (
"github.com/PeerDB-io/peer-flow/shared"
)

type PgRecordCopyFromSource struct {
stream *PgRecordStream
currentRecord [][]byte
currentRecord []any
}

func NewPgRecordCopyFromSource(
Expand All @@ -21,19 +17,11 @@ func NewPgRecordCopyFromSource(
func (src *PgRecordCopyFromSource) Next() bool {
rec, ok := <-src.stream.Records
src.currentRecord = rec
return ok
return ok || src.Err() != nil
}

func (src *PgRecordCopyFromSource) Values() ([]interface{}, error) {
if err := src.Err(); err != nil {
return nil, err
}

values := make([]interface{}, len(src.currentRecord))
for i, val := range src.currentRecord {
values[i] = shared.UnsafeFastReadOnlyBytesToString(val)
}
return values, nil
return src.currentRecord, src.Err()
}

func (src *PgRecordCopyFromSource) Err() error {
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qrecord_copy_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewQRecordCopyFromSource(
func (src *QRecordCopyFromSource) Next() bool {
rec, ok := <-src.stream.Records
src.currentRecord = rec
return ok
return ok && src.Err() != nil
}

func (src *QRecordCopyFromSource) Values() ([]interface{}, error) {
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type RecordStream[T any] struct {

type (
QRecordStream = RecordStream[qvalue.QValue]
PgRecordStream = RecordStream[[]byte]
PgRecordStream = RecordStream[any]
)

type RecordsToStreamRequest[T Items] struct {
Expand Down

0 comments on commit 83e505c

Please sign in to comment.