Skip to content

Commit

Permalink
prefer Items to T
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Apr 9, 2024
1 parent a37f7e6 commit 2a22de1
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 31 deletions.
12 changes: 6 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,14 @@ func (a *FlowableActivity) SyncPg(
connectors.CDCSyncPgConnector.ReplayPgTableSchemaDeltas)
}

func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, T model.Items](
func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, Items model.Items](
ctx context.Context,
a *FlowableActivity,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[T]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[T]) (*model.SyncResponse, error),
pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[Items]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error),
replay func(TSync, context.Context, string, []*protos.TableSchemaDelta) error,
) (*model.SyncResponse, error) {
flowName := config.FlowJobName
Expand Down Expand Up @@ -378,12 +378,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))

// start a goroutine to pull records from the source
recordBatch := model.NewCDCStream[T]()
recordBatch := model.NewCDCStream[Items]()
startTime := time.Now()

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[T]{
return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[Items]{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
Expand Down Expand Up @@ -448,7 +448,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}

syncStartTime = time.Now()
res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[T]{
res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: flowName,
Expand Down
20 changes: 10 additions & 10 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,24 +313,24 @@ func (c *PostgresConnector) PullRecords(
catalogPool *pgxpool.Pool,
req *model.PullRecordsRequest[model.RecordItems],
) error {
return PullCore(ctx, c, catalogPool, req, (*PostgresCDCSource).PullRecords)
return pullCore(ctx, c, catalogPool, req, (*PostgresCDCSource).PullRecords)
}

func (c *PostgresConnector) PullPg(
ctx context.Context,
catalogPool *pgxpool.Pool,
req *model.PullRecordsRequest[model.PgItems],
) error {
return PullCore(ctx, c, catalogPool, req, (*PostgresCDCSource).PullPg)
return pullCore(ctx, c, catalogPool, req, (*PostgresCDCSource).PullPg)
}

// PullRecords pulls records from the source.
func PullCore[T model.Items](
func pullCore[Items model.Items](
ctx context.Context,
c *PostgresConnector,
catalogPool *pgxpool.Pool,
req *model.PullRecordsRequest[T],
pull func(*PostgresCDCSource, context.Context, *model.PullRecordsRequest[T]) error,
req *model.PullRecordsRequest[Items],
pull func(*PostgresCDCSource, context.Context, *model.PullRecordsRequest[Items]) error,
) error {
defer func() {
req.RecordStream.Close()
Expand Down Expand Up @@ -428,10 +428,10 @@ func (c *PostgresConnector) SyncPg(ctx context.Context, req *model.SyncRecordsRe
}

// syncRecordsCore pushes records to the destination.
func syncRecordsCore[T model.Items](
func syncRecordsCore[Items model.Items](
ctx context.Context,
c *PostgresConnector,
req *model.SyncRecordsRequest[T],
req *model.SyncRecordsRequest[Items],
replayTableSchemaDeltas func(*PostgresConnector, context.Context, string, []*protos.TableSchemaDelta) error,
) (*model.SyncResponse, error) {
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
Expand All @@ -447,7 +447,7 @@ func syncRecordsCore[T model.Items](
} else {
var row []any
switch typedRecord := record.(type) {
case *model.InsertRecord[T]:
case *model.InsertRecord[Items]:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(model.ToJSONOptions{
UnnestColumns: nil,
HStoreAsJSON: false,
Expand All @@ -467,7 +467,7 @@ func syncRecordsCore[T model.Items](
"",
}

case *model.UpdateRecord[T]:
case *model.UpdateRecord[Items]:
newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(model.ToJSONOptions{
UnnestColumns: nil,
HStoreAsJSON: false,
Expand All @@ -494,7 +494,7 @@ func syncRecordsCore[T model.Items](
utils.KeysToString(typedRecord.UnchangedToastColumns),
}

case *model.DeleteRecord[T]:
case *model.DeleteRecord[Items]:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(model.ToJSONOptions{
UnnestColumns: nil,
HStoreAsJSON: false,
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/utils/cdc_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func encVal(val any) ([]byte, error) {
return buf.Bytes(), nil
}

type cdcStore[T model.Items] struct {
inMemoryRecords map[model.TableWithPkey]model.Record[T]
type cdcStore[Items model.Items] struct {
inMemoryRecords map[model.TableWithPkey]model.Record[Items]
pebbleDB *pebble.DB
flowJobName string
dbFolderName string
Expand All @@ -43,9 +43,9 @@ type cdcStore[T model.Items] struct {
numRecordsSwitchThreshold int
}

func NewCDCStore[T model.Items](flowJobName string) *cdcStore[T] {
return &cdcStore[T]{
inMemoryRecords: make(map[model.TableWithPkey]model.Record[T]),
func NewCDCStore[Items model.Items](flowJobName string) *cdcStore[Items] {
return &cdcStore[Items]{
inMemoryRecords: make(map[model.TableWithPkey]model.Record[Items]),
pebbleDB: nil,
numRecords: atomic.Int32{},
flowJobName: flowJobName,
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

func RecordsToRawTableStream[T model.Items](req *model.RecordsToStreamRequest[T]) (*model.QRecordStream, error) {
func RecordsToRawTableStream[Items model.Items](req *model.RecordsToStreamRequest[Items]) (*model.QRecordStream, error) {
recordStream := model.NewQRecordStream(1 << 17)
err := recordStream.SetSchema(&qvalue.QRecordSchema{
Fields: []qvalue.QField{
Expand Down Expand Up @@ -73,10 +73,10 @@ func RecordsToRawTableStream[T model.Items](req *model.RecordsToStreamRequest[T]
return recordStream, nil
}

func recordToQRecordOrError[T model.Items](batchID int64, record model.Record[T]) model.QRecordOrError {
func recordToQRecordOrError[Items model.Items](batchID int64, record model.Record[Items]) model.QRecordOrError {
var entries [8]qvalue.QValue
switch typedRecord := record.(type) {
case *model.InsertRecord[T]:
case *model.InsertRecord[Items]:
// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := model.ItemsToJSON(typedRecord.Items)
if err != nil {
Expand All @@ -89,7 +89,7 @@ func recordToQRecordOrError[T model.Items](batchID int64, record model.Record[T]
entries[4] = qvalue.QValueInt64{Val: 0}
entries[5] = qvalue.QValueString{Val: ""}
entries[7] = qvalue.QValueString{Val: ""}
case *model.UpdateRecord[T]:
case *model.UpdateRecord[Items]:
newItemsJSON, err := model.ItemsToJSON(typedRecord.NewItems)
if err != nil {
return model.QRecordOrError{
Expand All @@ -108,7 +108,7 @@ func recordToQRecordOrError[T model.Items](batchID int64, record model.Record[T]
entries[5] = qvalue.QValueString{Val: oldItemsJSON}
entries[7] = qvalue.QValueString{Val: KeysToString(typedRecord.UnchangedToastColumns)}

case *model.DeleteRecord[T]:
case *model.DeleteRecord[Items]:
itemsJSON, err := model.ItemsToJSON(typedRecord.Items)
if err != nil {
return model.QRecordOrError{
Expand Down
8 changes: 3 additions & 5 deletions flow/model/cdc_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ import (
type CDCStream[T Items] struct {
// empty signal to indicate if the records are going to be empty or not.
emptySignal chan bool
// Records are a list of records.
records chan Record[T]
// Schema changes from the slot
SchemaDeltas []*protos.TableSchemaDelta
// Indicates if the last checkpoint has been set.
records chan Record[T]
// Schema changes from slot
SchemaDeltas []*protos.TableSchemaDelta
lastCheckpointSet bool
// lastCheckpointID is the last ID of the commit that corresponds to this batch.
lastCheckpointID atomic.Int64
Expand Down

0 comments on commit 2a22de1

Please sign in to comment.