diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index b614ec3295..3c03ede0e9 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -22,7 +23,7 @@ import ( type EventHubConnector struct { ctx context.Context config *protos.EventHubGroupConfig - pgMetadata *PostgresMetadataStore + pgMetadata *metadataStore.PostgresMetadataStore tableSchemas map[string]*protos.TableSchema creds *azidentity.DefaultAzureCredential hubManager *EventHubManager @@ -40,7 +41,9 @@ func NewEventHubConnector( } hubManager := NewEventHubManager(ctx, defaultAzureCreds, config) - pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb()) + metadataSchemaName := "peerdb_eventhub_metadata" + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), + metadataSchemaName) if err != nil { log.Errorf("failed to create postgres metadata store: %v", err) return nil, err @@ -84,6 +87,48 @@ func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSc return nil } +func (c *EventHubConnector) NeedsSetupMetadataTables() bool { + return c.pgMetadata.NeedsSetupMetadata() +} + +func (c *EventHubConnector) SetupMetadataTables() error { + err := c.pgMetadata.SetupMetadata() + if err != nil { + log.Errorf("failed to setup metadata tables: %v", err) + return err + } + + return nil +} + +func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { + syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) + if err != nil { + return 0, err + } + + return syncBatchID, nil +} + +func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { + res, err := c.pgMetadata.FetchLastOffset(jobName) + if err != nil { + return nil, err + } + + return res, nil +} + +func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { + err := c.pgMetadata.UpdateLastOffset(jobName, offset) + if err != nil { + log.Errorf("failed to update last offset: %v", err) + return err + } + + return nil +} + func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { return fmt.Sprintf("syncing records to eventhub with"+ @@ -177,7 +222,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S log.Errorf("failed to update last offset: %v", err) return nil, err } - err = c.incrementSyncBatchID(req.FlowJobName) + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { log.Errorf("%v", err) return nil, err @@ -311,3 +356,11 @@ func (c *EventHubConnector) SetupNormalizedTables( TableExistsMapping: nil, }, nil } + +func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return err + } + return nil +} diff --git a/flow/connectors/eventhub/metadata.go b/flow/connectors/eventhub/metadata.go deleted file mode 100644 index 178ebbb076..0000000000 --- a/flow/connectors/eventhub/metadata.go +++ /dev/null @@ -1,236 +0,0 @@ -package conneventhub - -import ( - "context" - - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/jackc/pgx/v5/pgxpool" - log "github.com/sirupsen/logrus" -) - -const ( - // schema for the peerdb metadata - metadataSchema = "peerdb_eventhub_metadata" - // The name of the table that stores the last sync state. - lastSyncStateTableName = "last_sync_state" -) - -type PostgresMetadataStore struct { - config *protos.PostgresConfig - pool *pgxpool.Pool -} - -func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) { - connectionString := utils.GetPGConnectionString(pgConfig) - - pool, err := pgxpool.New(ctx, connectionString) - if err != nil { - log.Errorf("failed to create connection pool: %v", err) - return nil, err - } - log.Info("created connection pool for eventhub metadata store") - - return &PostgresMetadataStore{ - config: pgConfig, - pool: pool, - }, nil -} - -func (p *PostgresMetadataStore) Close() error { - if p.pool != nil { - p.pool.Close() - } - - return nil -} - -func (c *EventHubConnector) NeedsSetupMetadataTables() bool { - ms := c.pgMetadata - - // check if schema exists - rows := ms.pool.QueryRow(c.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", metadataSchema) - - var exists int64 - err := rows.Scan(&exists) - if err != nil { - log.Errorf("failed to check if schema exists: %v", err) - return false - } - - if exists > 0 { - return true - } - - return true -} - -func (c *EventHubConnector) SetupMetadataTables() error { - ms := c.pgMetadata - - // start a transaction - tx, err := ms.pool.Begin(c.ctx) - if err != nil { - log.Errorf("failed to start transaction: %v", err) - return err - } - - // create the schema - _, err = tx.Exec(c.ctx, "CREATE SCHEMA IF NOT EXISTS "+metadataSchema) - if err != nil { - log.Errorf("failed to create schema: %v", err) - return err - } - - // create the last sync state table - _, err = tx.Exec(c.ctx, ` - CREATE TABLE IF NOT EXISTS `+metadataSchema+`.`+lastSyncStateTableName+` ( - job_name TEXT PRIMARY KEY NOT NULL, - last_offset BIGINT NOT NULL, - updated_at TIMESTAMP NOT NULL DEFAULT NOW(), - sync_batch_id BIGINT NOT NULL - ) - `) - if err != nil { - log.Errorf("failed to create last sync state table: %v", err) - return err - } - - // commit the transaction - err = tx.Commit(c.ctx) - if err != nil { - log.Errorf("failed to commit transaction: %v", err) - return err - } - - return nil -} - -func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` - SELECT last_offset - FROM `+metadataSchema+`.`+lastSyncStateTableName+` - WHERE job_name = $1 - `, jobName) - - var offset int64 - err := rows.Scan(&offset) - if err != nil { - // if the job doesn't exist, return 0 - if err.Error() == "no rows in result set" { - return &protos.LastSyncState{ - Checkpoint: 0, - }, nil - } - - log.WithFields(log.Fields{ - "flowName": jobName, - }).Errorf("failed to get last offset: %v", err) - return nil, err - } - - log.Infof("got last offset for job `%s`: %d", jobName, offset) - - return &protos.LastSyncState{ - Checkpoint: offset, - }, nil -} - -func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` - SELECT sync_batch_id - FROM `+metadataSchema+`.`+lastSyncStateTableName+` - WHERE job_name = $1 - `, jobName) - - var syncBatchID int64 - err := rows.Scan(&syncBatchID) - if err != nil { - // if the job doesn't exist, return 0 - if err.Error() == "no rows in result set" { - return 0, nil - } - - log.WithFields(log.Fields{ - "flowName": jobName, - }).Errorf("failed to get last offset: %v", err) - return 0, err - } - - log.Infof("got last sync batch ID for job `%s`: %d", jobName, syncBatchID) - - return syncBatchID, nil -} - -// update offset for a job -func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { - ms := c.pgMetadata - - // start a transaction - tx, err := ms.pool.Begin(c.ctx) - if err != nil { - log.Errorf("failed to start transaction: %v", err) - return err - } - - // update the last offset - log.WithFields(log.Fields{ - "flowName": jobName, - }).Infof("updating last offset for job `%s` to `%d`", jobName, offset) - _, err = tx.Exec(c.ctx, ` - INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) - VALUES ($1, $2, $3) - ON CONFLICT (job_name) - DO UPDATE SET last_offset = $2, updated_at = NOW() - `, jobName, offset, 0) - - if err != nil { - log.WithFields(log.Fields{ - "flowName": jobName, - }).Errorf("failed to update last offset: %v", err) - return err - } - - // commit the transaction - err = tx.Commit(c.ctx) - if err != nil { - log.Errorf("failed to commit transaction: %v", err) - return err - } - - return nil -} - -// update offset for a job -func (c *EventHubConnector) incrementSyncBatchID(jobName string) error { - ms := c.pgMetadata - - log.WithFields(log.Fields{ - "flowName": jobName, - }).Infof("incrementing sync batch id for job `%s`", jobName) - _, err := ms.pool.Exec(c.ctx, ` - UPDATE `+metadataSchema+`.`+lastSyncStateTableName+` - SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 - `, jobName) - - if err != nil { - log.WithFields(log.Fields{ - "flowName": jobName, - }).Errorf("failed to increment sync batch id: %v", err) - return err - } - - return nil -} - -func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { - _, err := c.pgMetadata.pool.Exec(c.ctx, ` - DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+` - WHERE job_name = $1 - `, jobName) - return err -} diff --git a/flow/connectors/s3/metadata.go b/flow/connectors/external_metadata/store.go similarity index 66% rename from flow/connectors/s3/metadata.go rename to flow/connectors/external_metadata/store.go index fb0a248414..68ff6a0689 100644 --- a/flow/connectors/s3/metadata.go +++ b/flow/connectors/external_metadata/store.go @@ -1,4 +1,4 @@ -package conns3 +package connmetadata import ( "context" @@ -10,16 +10,18 @@ import ( ) const ( - metadataSchema = "peerdb_s3_metadata" lastSyncStateTableName = "last_sync_state" ) type PostgresMetadataStore struct { - config *protos.PostgresConfig - pool *pgxpool.Pool + ctx context.Context + config *protos.PostgresConfig + pool *pgxpool.Pool + schemaName string } -func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) { +func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, + schemaName string) (*PostgresMetadataStore, error) { connectionString := utils.GetPGConnectionString(pgConfig) pool, err := pgxpool.New(ctx, connectionString) @@ -27,11 +29,13 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf log.Errorf("failed to create connection pool: %v", err) return nil, err } - log.Info("created connection pool for s3 metadata store") + log.Info("created connection pool for metadata store") return &PostgresMetadataStore{ - config: pgConfig, - pool: pool, + ctx: ctx, + config: pgConfig, + pool: pool, + schemaName: schemaName, }, nil } @@ -43,11 +47,9 @@ func (p *PostgresMetadataStore) Close() error { return nil } -func (c *S3Connector) NeedsSetupMetadataTables() bool { - ms := c.pgMetadata - +func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { // check if schema exists - rows := ms.pool.QueryRow(c.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", metadataSchema) + rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) var exists int64 err := rows.Scan(&exists) @@ -63,26 +65,24 @@ func (c *S3Connector) NeedsSetupMetadataTables() bool { return true } -func (c *S3Connector) SetupMetadataTables() error { - ms := c.pgMetadata - +func (p *PostgresMetadataStore) SetupMetadata() error { // start a transaction - tx, err := ms.pool.Begin(c.ctx) + tx, err := p.pool.Begin(p.ctx) if err != nil { log.Errorf("failed to start transaction: %v", err) return err } // create the schema - _, err = tx.Exec(c.ctx, "CREATE SCHEMA IF NOT EXISTS "+metadataSchema) + _, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) if err != nil { log.Errorf("failed to create schema: %v", err) return err } // create the last sync state table - _, err = tx.Exec(c.ctx, ` - CREATE TABLE IF NOT EXISTS `+metadataSchema+`.`+lastSyncStateTableName+` ( + _, err = tx.Exec(p.ctx, ` + CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` ( job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), @@ -93,9 +93,10 @@ func (c *S3Connector) SetupMetadataTables() error { log.Errorf("failed to create last sync state table: %v", err) return err } + log.Infof("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName) // commit the transaction - err = tx.Commit(c.ctx) + err = tx.Commit(p.ctx) if err != nil { log.Errorf("failed to commit transaction: %v", err) return err @@ -104,15 +105,12 @@ func (c *S3Connector) SetupMetadataTables() error { return nil } -func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` +func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) { + rows := p.pool.QueryRow(p.ctx, ` SELECT last_offset - FROM `+metadataSchema+`.`+lastSyncStateTableName+` + FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) - var offset int64 err := rows.Scan(&offset) if err != nil { @@ -136,12 +134,10 @@ func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, erro }, nil } -func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` +func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { + rows := p.pool.QueryRow(p.ctx, ` SELECT sync_batch_id - FROM `+metadataSchema+`.`+lastSyncStateTableName+` + FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) @@ -165,11 +161,10 @@ func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { } // update offset for a job -func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { - ms := c.pgMetadata +func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { // start a transaction - tx, err := ms.pool.Begin(c.ctx) + tx, err := p.pool.Begin(p.ctx) if err != nil { log.Errorf("failed to start transaction: %v", err) return err @@ -179,8 +174,8 @@ func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { log.WithFields(log.Fields{ "flowName": jobName, }).Infof("updating last offset for job `%s` to `%d`", jobName, offset) - _, err = tx.Exec(c.ctx, ` - INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) + _, err = tx.Exec(p.ctx, ` + INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) DO UPDATE SET last_offset = $2, updated_at = NOW() @@ -194,7 +189,7 @@ func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { } // commit the transaction - err = tx.Commit(c.ctx) + err = tx.Commit(p.ctx) if err != nil { log.Errorf("failed to commit transaction: %v", err) return err @@ -204,14 +199,13 @@ func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { } // update offset for a job -func (c *S3Connector) incrementSyncBatchID(jobName string) error { - ms := c.pgMetadata +func (p *PostgresMetadataStore) IncrementID(jobName string) error { log.WithFields(log.Fields{ "flowName": jobName, }).Infof("incrementing sync batch id for job `%s`", jobName) - _, err := ms.pool.Exec(c.ctx, ` - UPDATE `+metadataSchema+`.`+lastSyncStateTableName+` + _, err := p.pool.Exec(p.ctx, ` + UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 `, jobName) @@ -225,9 +219,9 @@ func (c *S3Connector) incrementSyncBatchID(jobName string) error { return nil } -func (c *S3Connector) SyncFlowCleanup(jobName string) error { - _, err := c.pgMetadata.pool.Exec(c.ctx, ` - DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+` +func (p *PostgresMetadataStore) DropMetadata(jobName string) error { + _, err := p.pool.Exec(p.ctx, ` + DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) return err diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 9a78757ea3..a96dcb481a 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -5,20 +5,19 @@ import ( "fmt" "time" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/aws/aws-sdk-go/service/s3" - "github.com/google/uuid" log "github.com/sirupsen/logrus" ) type S3Connector struct { ctx context.Context url string - pgMetadata *PostgresMetadataStore + pgMetadata *metadataStore.PostgresMetadataStore client s3.S3 creds utils.S3PeerCredentials } @@ -56,7 +55,9 @@ func NewS3Connector(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to create S3 client: %w", err) } - pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb()) + metadataSchemaName := "peerdb_s3_metadata" + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, + config.GetMetadataDb(), metadataSchemaName) if err != nil { log.Errorf("failed to create postgres metadata store: %v", err) return nil, err @@ -71,11 +72,69 @@ func NewS3Connector(ctx context.Context, }, nil } +func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + log.Infof("CreateRawTable for S3 is a no-op") + return nil, nil +} + +func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema) error { + log.Infof("InitializeTableSchema for S3 is a no-op") + return nil +} + func (c *S3Connector) Close() error { log.Debugf("Closing s3 connector is a noop") return nil } +func (c *S3Connector) ConnectionActive() bool { + _, err := c.client.ListBuckets(nil) + return err == nil +} + +func (c *S3Connector) NeedsSetupMetadataTables() bool { + return c.pgMetadata.NeedsSetupMetadata() +} + +func (c *S3Connector) SetupMetadataTables() error { + err := c.pgMetadata.SetupMetadata() + if err != nil { + log.Errorf("failed to setup metadata tables: %v", err) + return err + } + + return nil +} + +func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { + syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) + if err != nil { + return 0, err + } + + return syncBatchID, nil +} + +func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { + res, err := c.pgMetadata.FetchLastOffset(jobName) + if err != nil { + return nil, err + } + + return res, nil +} + +// update offset for a job +func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { + err := c.pgMetadata.UpdateLastOffset(jobName, offset) + if err != nil { + log.Errorf("failed to update last offset: %v", err) + return err + } + + return nil +} + func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { if len(req.Records.Records) == 0 { return &model.SyncResponse{ @@ -91,181 +150,19 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes } syncBatchID = syncBatchID + 1 lastCP := req.Records.LastCheckPointID - recordStream := model.NewQRecordStream(len(req.Records.Records)) - err = recordStream.SetSchema(&model.QRecordSchema{ - Fields: []*model.QField{ - { - Name: "_peerdb_uid", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_timestamp", - Type: qvalue.QValueKindInt64, - Nullable: false, - }, - { - Name: "_peerdb_destination_table_name", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_data", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_record_type", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_match_data", - Type: qvalue.QValueKindString, - Nullable: true, - }, - { - Name: "_peerdb_batch_id", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_unchanged_toast_columns", - Type: qvalue.QValueKindString, - Nullable: true, - }, - }, + tableNameRowsMapping := make(map[string]uint32) + streamRes, err := utils.RecordsToRawTableStream(model.RecordsToStreamRequest{ + Records: req.Records.Records, + TableMapping: tableNameRowsMapping, + CP: 0, + BatchID: syncBatchID, }) if err != nil { - return nil, err - } - - first := true - var firstCP int64 = 0 - tableNameRowsMapping := make(map[string]uint32) - - for _, record := range req.Records.Records { - var entries [8]qvalue.QValue - switch typedRecord := record.(type) { - case *model.InsertRecord: - // json.Marshal converts bytes in Hex automatically to BASE64 string. - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) - } - - // add insert record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 0, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: "", - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := typedRecord.NewItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) - } - oldItemsJSON, err := typedRecord.OldItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) - } - - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: newItemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 1, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: oldItemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) - } - - // append delete record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 2, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) - } - - if first { - firstCP = record.GetCheckPointID() - first = false - } - - entries[0] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: uuid.New().String(), - } - entries[1] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: time.Now().UnixNano(), - } - entries[6] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: syncBatchID, - } - - recordStream.Records <- &model.QRecordOrError{ - Record: &model.QRecord{ - NumEntries: 8, - Entries: entries[:], - }, - } + return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } - + firstCP := streamRes.CP + recordStream := streamRes.Stream qrepConfig := &protos.QRepConfig{ FlowJobName: req.FlowJobName, DestinationTableIdentifier: fmt.Sprintf("raw_table_%s", req.FlowJobName), @@ -285,7 +182,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes log.Errorf("failed to update last offset for s3 cdc: %v", err) return nil, err } - err = c.incrementSyncBatchID(req.FlowJobName) + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { log.Errorf("%v", err) return nil, err @@ -299,16 +196,6 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes }, nil } -func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { - log.Infof("CreateRawTable for S3 is a no-op") - return nil, nil -} - -func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema) error { - log.Infof("InitializeTableSchema for S3 is a no-op") - return nil -} - func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatchInput) ( *protos.SetupNormalizedTableBatchOutput, error) { @@ -316,7 +203,10 @@ func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatc return nil, nil } -func (c *S3Connector) ConnectionActive() bool { - _, err := c.client.ListBuckets(nil) - return err == nil +func (c *S3Connector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return err + } + return nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4335ee85e8..7046746ecc 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -645,183 +645,20 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra func (c *SnowflakeConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, rawTableIdentifier string, syncBatchID int64) (*model.SyncResponse, error) { - recordStream := model.NewQRecordStream(len(req.Records.Records)) - - err := recordStream.SetSchema(&model.QRecordSchema{ - Fields: []*model.QField{ - { - Name: "_peerdb_uid", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_timestamp", - Type: qvalue.QValueKindInt64, - Nullable: false, - }, - { - Name: "_peerdb_destination_table_name", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_data", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_record_type", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_match_data", - Type: qvalue.QValueKindString, - Nullable: true, - }, - { - Name: "_peerdb_batch_id", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_unchanged_toast_columns", - Type: qvalue.QValueKindString, - Nullable: true, - }, - }, - }) - if err != nil { - return nil, err - } - first := true - var firstCP int64 = 0 lastCP := req.Records.LastCheckPointID tableNameRowsMapping := make(map[string]uint32) - - for _, record := range req.Records.Records { - var entries [8]qvalue.QValue - switch typedRecord := record.(type) { - case *model.InsertRecord: - // json.Marshal converts bytes in Hex automatically to BASE64 string. - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) - } - - // add insert record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 0, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: "", - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := typedRecord.NewItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) - } - oldItemsJSON, err := typedRecord.OldItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) - } - - // add update record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: newItemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 1, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: oldItemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) - } - - // append delete record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 2, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) - } - - if first { - firstCP = record.GetCheckPointID() - first = false - } - - entries[0] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: uuid.New().String(), - } - entries[1] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: time.Now().UnixNano(), - } - entries[6] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: syncBatchID, - } - - recordStream.Records <- &model.QRecordOrError{ - Record: &model.QRecord{ - NumEntries: 8, - Entries: entries[:], - }, - } + streamRes, err := utils.RecordsToRawTableStream(model.RecordsToStreamRequest{ + Records: req.Records.Records, + TableMapping: tableNameRowsMapping, + CP: 0, + BatchID: syncBatchID, + }) + if err != nil { + return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } - + firstCP := streamRes.CP + recordStream := streamRes.Stream qrepConfig := &protos.QRepConfig{ StagingPath: "", FlowJobName: req.FlowJobName, @@ -841,7 +678,6 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, r return nil, err } metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime)) - return &model.SyncResponse{ FirstSyncedCheckPointID: firstCP, LastSyncedCheckPointID: lastCP, diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go new file mode 100644 index 0000000000..2d8ed15b9b --- /dev/null +++ b/flow/connectors/utils/stream.go @@ -0,0 +1,189 @@ +package utils + +import ( + "fmt" + "time" + + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/google/uuid" +) + +func RecordsToRawTableStream(req model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) { + recordStream := model.NewQRecordStream(len(req.Records)) + err := recordStream.SetSchema(&model.QRecordSchema{ + Fields: []*model.QField{ + { + Name: "_peerdb_uid", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_timestamp", + Type: qvalue.QValueKindInt64, + Nullable: false, + }, + { + Name: "_peerdb_destination_table_name", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_data", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_record_type", + Type: qvalue.QValueKindInt64, + Nullable: true, + }, + { + Name: "_peerdb_match_data", + Type: qvalue.QValueKindString, + Nullable: true, + }, + { + Name: "_peerdb_batch_id", + Type: qvalue.QValueKindInt64, + Nullable: true, + }, + { + Name: "_peerdb_unchanged_toast_columns", + Type: qvalue.QValueKindString, + Nullable: true, + }, + }, + }) + if err != nil { + return nil, err + } + + first := true + var firstCP int64 = req.CP + for _, record := range req.Records { + var entries [8]qvalue.QValue + switch typedRecord := record.(type) { + case *model.InsertRecord: + // json.Marshal converts bytes in Hex automatically to BASE64 string. + itemsJSON, err := typedRecord.Items.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) + } + + // add insert record to the raw table + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 0, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: "", + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + case *model.UpdateRecord: + newItemsJSON, err := typedRecord.NewItems.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) + } + oldItemsJSON, err := typedRecord.OldItems.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) + } + + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: newItemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 1, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: oldItemsJSON, + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + case *model.DeleteRecord: + itemsJSON, err := typedRecord.Items.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + } + + // append delete record to the raw table + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 2, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + default: + return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) + } + + if first { + firstCP = record.GetCheckPointID() + first = false + } + + entries[0] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: uuid.New().String(), + } + entries[1] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: time.Now().UnixNano(), + } + entries[6] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: req.BatchID, + } + + recordStream.Records <- &model.QRecordOrError{ + Record: &model.QRecord{ + NumEntries: 8, + Entries: entries[:], + }, + } + } + + return &model.RecordsToStreamResponse{ + Stream: recordStream, + CP: firstCP, + }, nil +} diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 0ec424ea85..aaca2a125a 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -44,16 +44,15 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 9, - MaxBatchSize: 20, + TotalSyncFlows: 5, + MaxBatchSize: 5, } - // Insert 100 rows into postgres, update 20 rows, and delete 20 rows go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) s.NoError(err) - //insert 100 - for i := 0; i < 100; i++ { + //insert 20 rows + for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -61,19 +60,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { `, srcTableName), testKey, testValue) s.NoError(err) } - //update 20 - for i := 0; i < 20; i++ { - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - UPDATE %s SET value=$1 where id=$2 - `, srcTableName), "updated_value", i) - s.NoError(err) - } - //delete 20 - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - DELETE FROM %s where id < 20 - `, srcTableName)) s.NoError(err) - fmt.Println("Inserted 100 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -93,7 +80,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) require.NoError(s.T(), err) - require.Equal(s.T(), 8, len(files)) + require.Equal(s.T(), 4, len(files)) env.AssertExpectations(s.T()) } @@ -128,16 +115,15 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 9, - MaxBatchSize: 20, + TotalSyncFlows: 5, + MaxBatchSize: 5, } - // Insert 100 rows into postgres, update 20 rows, and delete 20 rows go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) s.NoError(err) - //insert 100 - for i := 0; i < 100; i++ { + //insert 20 rows + for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -145,19 +131,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { `, srcTableName), testKey, testValue) s.NoError(err) } - //update 20 - for i := 0; i < 20; i++ { - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - UPDATE %s SET value=$1 where id=$2 - `, srcTableName), "updated_value", i) - s.NoError(err) - } - //delete 20 - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - DELETE FROM %s where id < 20 - `, srcTableName)) s.NoError(err) - fmt.Println("Inserted 100 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -174,10 +148,10 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { defer cancel() fmt.Println("JobName: ", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) + fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files)) require.NoError(s.T(), err) - require.Equal(s.T(), 8, len(files)) + require.Equal(s.T(), 4, len(files)) env.AssertExpectations(s.T()) } diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 1fb22826ec..a338ace4f4 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -19,6 +19,18 @@ type QRecordStream struct { schemaCache *QRecordSchema } +type RecordsToStreamRequest struct { + Records []Record + TableMapping map[string]uint32 + CP int64 + BatchID int64 +} + +type RecordsToStreamResponse struct { + Stream *QRecordStream + CP int64 +} + func NewQRecordStream(buffer int) *QRecordStream { return &QRecordStream{ schema: make(chan *QRecordSchemaOrError, 1),