From 2a22de188b655ee5998a1d18f3133f1faddae3c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 9 Apr 2024 23:30:16 +0000 Subject: [PATCH] prefer Items to T --- flow/activities/flowable.go | 12 ++++++------ flow/connectors/postgres/postgres.go | 20 ++++++++++---------- flow/connectors/utils/cdc_store.go | 10 +++++----- flow/connectors/utils/stream.go | 10 +++++----- flow/model/cdc_stream.go | 8 +++----- 5 files changed, 29 insertions(+), 31 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e9cc494884..3213de8e2b 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -315,14 +315,14 @@ func (a *FlowableActivity) SyncPg( connectors.CDCSyncPgConnector.ReplayPgTableSchemaDeltas) } -func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, T model.Items]( +func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, Items model.Items]( ctx context.Context, a *FlowableActivity, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, sessionID string, - pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[T]) error, - sync func(TSync, context.Context, *model.SyncRecordsRequest[T]) (*model.SyncResponse, error), + pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[Items]) error, + sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error), replay func(TSync, context.Context, string, []*protos.TableSchemaDelta) error, ) (*model.SyncResponse, error) { flowName := config.FlowJobName @@ -378,12 +378,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) // start a goroutine to pull records from the source - recordBatch := model.NewCDCStream[T]() + recordBatch := model.NewCDCStream[Items]() startTime := time.Now() errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[T]{ + return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[Items]{ FlowJobName: flowName, SrcTableIDNameMapping: options.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, @@ -448,7 +448,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } syncStartTime = time.Now() - res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[T]{ + res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{ SyncBatchID: syncBatchID, Records: recordBatch, FlowJobName: flowName, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 632b616738..5be3bcc45b 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -313,7 +313,7 @@ func (c *PostgresConnector) PullRecords( catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.RecordItems], ) error { - return PullCore(ctx, c, catalogPool, req, (*PostgresCDCSource).PullRecords) + return pullCore(ctx, c, catalogPool, req, (*PostgresCDCSource).PullRecords) } func (c *PostgresConnector) PullPg( @@ -321,16 +321,16 @@ func (c *PostgresConnector) PullPg( catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.PgItems], ) error { - return PullCore(ctx, c, catalogPool, req, (*PostgresCDCSource).PullPg) + return pullCore(ctx, c, catalogPool, req, (*PostgresCDCSource).PullPg) } // PullRecords pulls records from the source. -func PullCore[T model.Items]( +func pullCore[Items model.Items]( ctx context.Context, c *PostgresConnector, catalogPool *pgxpool.Pool, - req *model.PullRecordsRequest[T], - pull func(*PostgresCDCSource, context.Context, *model.PullRecordsRequest[T]) error, + req *model.PullRecordsRequest[Items], + pull func(*PostgresCDCSource, context.Context, *model.PullRecordsRequest[Items]) error, ) error { defer func() { req.RecordStream.Close() @@ -428,10 +428,10 @@ func (c *PostgresConnector) SyncPg(ctx context.Context, req *model.SyncRecordsRe } // syncRecordsCore pushes records to the destination. -func syncRecordsCore[T model.Items]( +func syncRecordsCore[Items model.Items]( ctx context.Context, c *PostgresConnector, - req *model.SyncRecordsRequest[T], + req *model.SyncRecordsRequest[Items], replayTableSchemaDeltas func(*PostgresConnector, context.Context, string, []*protos.TableSchemaDelta) error, ) (*model.SyncResponse, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) @@ -447,7 +447,7 @@ func syncRecordsCore[T model.Items]( } else { var row []any switch typedRecord := record.(type) { - case *model.InsertRecord[T]: + case *model.InsertRecord[Items]: itemsJSON, err := typedRecord.Items.ToJSONWithOptions(model.ToJSONOptions{ UnnestColumns: nil, HStoreAsJSON: false, @@ -467,7 +467,7 @@ func syncRecordsCore[T model.Items]( "", } - case *model.UpdateRecord[T]: + case *model.UpdateRecord[Items]: newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(model.ToJSONOptions{ UnnestColumns: nil, HStoreAsJSON: false, @@ -494,7 +494,7 @@ func syncRecordsCore[T model.Items]( utils.KeysToString(typedRecord.UnchangedToastColumns), } - case *model.DeleteRecord[T]: + case *model.DeleteRecord[Items]: itemsJSON, err := typedRecord.Items.ToJSONWithOptions(model.ToJSONOptions{ UnnestColumns: nil, HStoreAsJSON: false, diff --git a/flow/connectors/utils/cdc_store.go b/flow/connectors/utils/cdc_store.go index 3b061f9c55..3b47371281 100644 --- a/flow/connectors/utils/cdc_store.go +++ b/flow/connectors/utils/cdc_store.go @@ -31,8 +31,8 @@ func encVal(val any) ([]byte, error) { return buf.Bytes(), nil } -type cdcStore[T model.Items] struct { - inMemoryRecords map[model.TableWithPkey]model.Record[T] +type cdcStore[Items model.Items] struct { + inMemoryRecords map[model.TableWithPkey]model.Record[Items] pebbleDB *pebble.DB flowJobName string dbFolderName string @@ -43,9 +43,9 @@ type cdcStore[T model.Items] struct { numRecordsSwitchThreshold int } -func NewCDCStore[T model.Items](flowJobName string) *cdcStore[T] { - return &cdcStore[T]{ - inMemoryRecords: make(map[model.TableWithPkey]model.Record[T]), +func NewCDCStore[Items model.Items](flowJobName string) *cdcStore[Items] { + return &cdcStore[Items]{ + inMemoryRecords: make(map[model.TableWithPkey]model.Record[Items]), pebbleDB: nil, numRecords: atomic.Int32{}, flowJobName: flowJobName, diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index e55c88b1a8..7c362502ec 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -11,7 +11,7 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -func RecordsToRawTableStream[T model.Items](req *model.RecordsToStreamRequest[T]) (*model.QRecordStream, error) { +func RecordsToRawTableStream[Items model.Items](req *model.RecordsToStreamRequest[Items]) (*model.QRecordStream, error) { recordStream := model.NewQRecordStream(1 << 17) err := recordStream.SetSchema(&qvalue.QRecordSchema{ Fields: []qvalue.QField{ @@ -73,10 +73,10 @@ func RecordsToRawTableStream[T model.Items](req *model.RecordsToStreamRequest[T] return recordStream, nil } -func recordToQRecordOrError[T model.Items](batchID int64, record model.Record[T]) model.QRecordOrError { +func recordToQRecordOrError[Items model.Items](batchID int64, record model.Record[Items]) model.QRecordOrError { var entries [8]qvalue.QValue switch typedRecord := record.(type) { - case *model.InsertRecord[T]: + case *model.InsertRecord[Items]: // json.Marshal converts bytes in Hex automatically to BASE64 string. itemsJSON, err := model.ItemsToJSON(typedRecord.Items) if err != nil { @@ -89,7 +89,7 @@ func recordToQRecordOrError[T model.Items](batchID int64, record model.Record[T] entries[4] = qvalue.QValueInt64{Val: 0} entries[5] = qvalue.QValueString{Val: ""} entries[7] = qvalue.QValueString{Val: ""} - case *model.UpdateRecord[T]: + case *model.UpdateRecord[Items]: newItemsJSON, err := model.ItemsToJSON(typedRecord.NewItems) if err != nil { return model.QRecordOrError{ @@ -108,7 +108,7 @@ func recordToQRecordOrError[T model.Items](batchID int64, record model.Record[T] entries[5] = qvalue.QValueString{Val: oldItemsJSON} entries[7] = qvalue.QValueString{Val: KeysToString(typedRecord.UnchangedToastColumns)} - case *model.DeleteRecord[T]: + case *model.DeleteRecord[Items]: itemsJSON, err := model.ItemsToJSON(typedRecord.Items) if err != nil { return model.QRecordOrError{ diff --git a/flow/model/cdc_stream.go b/flow/model/cdc_stream.go index a58bf372dd..72eb1ad490 100644 --- a/flow/model/cdc_stream.go +++ b/flow/model/cdc_stream.go @@ -11,11 +11,9 @@ import ( type CDCStream[T Items] struct { // empty signal to indicate if the records are going to be empty or not. emptySignal chan bool - // Records are a list of records. - records chan Record[T] - // Schema changes from the slot - SchemaDeltas []*protos.TableSchemaDelta - // Indicates if the last checkpoint has been set. + records chan Record[T] + // Schema changes from slot + SchemaDeltas []*protos.TableSchemaDelta lastCheckpointSet bool // lastCheckpointID is the last ID of the commit that corresponds to this batch. lastCheckpointID atomic.Int64