diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index edb3cbdcf3..876e437a11 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -18,6 +18,7 @@ import ( "google.golang.org/api/iterator" "google.golang.org/api/option" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -27,14 +28,6 @@ import ( ) const ( - // MirrorJobsTable has the following schema: - // CREATE TABLE peerdb_mirror_jobs ( - // mirror_job_id STRING NOT NULL, - // offset INTEGER NOT NULL, - // sync_batch_id INTEGER NOT NULL - // normalize_batch_id INTEGER - // ) - MirrorJobsTable = "peerdb_mirror_jobs" SyncRecordsBatchSize = 1024 ) @@ -57,6 +50,7 @@ type BigQueryConnector struct { bqConfig *protos.BigqueryConfig client *bigquery.Client storageClient *storage.Client + pgMetadata *metadataStore.PostgresMetadataStore datasetID string projectID string catalogPool *pgxpool.Pool @@ -236,6 +230,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* client: client, datasetID: datasetID, projectID: projectID, + pgMetadata: metadataStore.NewPostgresMetadataStoreFromCatalog(ctx, catalogPool), storageClient: storageClient, catalogPool: catalogPool, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), @@ -263,10 +258,8 @@ func (c *BigQueryConnector) ConnectionActive() error { return nil } -// NeedsSetupMetadataTables returns true if the metadata tables need to be set up. func (c *BigQueryConnector) NeedsSetupMetadataTables() bool { - _, err := c.client.DatasetInProject(c.projectID, c.datasetID).Table(MirrorJobsTable).Metadata(c.ctx) - return err != nil + return false } func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error { @@ -324,135 +317,24 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, return nil } -// SetupMetadataTables sets up the metadata tables. func (c *BigQueryConnector) SetupMetadataTables() error { - // check if the dataset exists - dataset := c.client.DatasetInProject(c.projectID, c.datasetID) - if _, err := dataset.Metadata(c.ctx); err != nil { - // create the dataset as it doesn't exist - if err := dataset.Create(c.ctx, nil); err != nil { - return fmt.Errorf("failed to create dataset %s: %w", c.datasetID, err) - } - } - - // Create the mirror jobs table, NeedsSetupMetadataTables ensures it doesn't exist. - mirrorJobsTable := dataset.Table(MirrorJobsTable) - mirrorJobsTableMetadata := &bigquery.TableMetadata{ - Schema: bigquery.Schema{ - {Name: "mirror_job_name", Type: bigquery.StringFieldType}, - {Name: "offset", Type: bigquery.IntegerFieldType}, - {Name: "sync_batch_id", Type: bigquery.IntegerFieldType}, - {Name: "normalize_batch_id", Type: bigquery.IntegerFieldType}, - }, - } - if err := mirrorJobsTable.Create(c.ctx, mirrorJobsTableMetadata); err != nil { - // if the table already exists, ignore the error - if !strings.Contains(err.Error(), "Already Exists") { - return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err) - } else { - c.logger.Info(fmt.Sprintf("table %s already exists", MirrorJobsTable)) - } - } - return nil } func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT offset FROM %s WHERE mirror_job_name = '%s'", MirrorJobsTable, jobName) - q := c.client.Query(query) - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - it, err := q.Read(c.ctx) - if err != nil { - err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return 0, err - } - - var row []bigquery.Value - err = it.Next(&row) - if err != nil { - c.logger.Info("no row found, returning nil") - return 0, nil - } - - if row[0] == nil { - c.logger.Info("no offset found, returning nil") - return 0, nil - } else { - return row[0].(int64), nil - } + return c.pgMetadata.FetchLastOffset(jobName) } -func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error { - query := fmt.Sprintf( - "UPDATE %s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'", - MirrorJobsTable, - lastOffset, - jobName, - ) - q := c.client.Query(query) - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - _, err := q.Read(c.ctx) - if err != nil { - return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - } - - return nil +func (c *BigQueryConnector) SetLastOffset(jobName string, offset int64) error { + return c.pgMetadata.UpdateLastOffset(jobName, offset) } func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT sync_batch_id FROM %s WHERE mirror_job_name = '%s'", - MirrorJobsTable, jobName) - q := c.client.Query(query) - q.DisableQueryCache = true - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - it, err := q.Read(c.ctx) - if err != nil { - err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return -1, err - } - - var row []bigquery.Value - err = it.Next(&row) - if err != nil { - c.logger.Info("no row found") - return 0, nil - } - - if row[0] == nil { - c.logger.Info("no sync_batch_id found, returning 0") - return 0, nil - } else { - return row[0].(int64), nil - } + return c.pgMetadata.GetLastBatchID(jobName) } func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT normalize_batch_id FROM %s WHERE mirror_job_name = '%s'", - MirrorJobsTable, jobName) - q := c.client.Query(query) - q.DisableQueryCache = true - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - it, err := q.Read(c.ctx) - if err != nil { - err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return 0, err - } - - var row []bigquery.Value - err = it.Next(&row) - if err != nil { - c.logger.Info("no row found for job") - return 0, nil - } - - if row[0] != nil { - return row[0].(int64), nil - } - return 0, nil + return c.pgMetadata.GetLastNormalizeBatchID(jobName) } func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -591,20 +473,15 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } - hasJob, err := c.metadataHasJob(req.FlowJobName) - if err != nil { - return nil, fmt.Errorf("failed to check if job exists: %w", err) - } - // if job is not yet found in the peerdb_mirror_jobs_table - // OR sync is lagging end normalize - if !hasJob || normBatchID >= req.SyncBatchID { - c.logger.Info("waiting for sync to catch up, so finishing") + // normalize has caught up with sync, chill until more records are loaded. + if normBatchID >= req.SyncBatchID { return &model.NormalizeResponse{ Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil } + distinctTableNames, err := c.getDistinctTableNamesInBatch( req.FlowJobName, req.SyncBatchID, @@ -662,17 +539,10 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } } } - // update metadata to make the last normalized batch id to the recent last sync batch id. - updateMetadataStmt := fmt.Sprintf( - "UPDATE %s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", - MirrorJobsTable, req.SyncBatchID, req.FlowJobName) - - query := c.client.Query(updateMetadataStmt) - query.DefaultProjectID = c.projectID - query.DefaultDatasetID = c.datasetID - _, err = query.Read(c.ctx) + + err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, req.SyncBatchID) if err != nil { - return nil, fmt.Errorf("failed to execute update metadata statements %s: %v", updateMetadataStmt, err) + return nil, err } return &model.NormalizeResponse{ @@ -754,56 +624,6 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr }, nil } -// getUpdateMetadataStmt updates the metadata tables for a given job. -func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64, - batchID int64, -) (string, error) { - hasJob, err := c.metadataHasJob(jobName) - if err != nil { - return "", fmt.Errorf("failed to check if job exists: %w", err) - } - - // create the job in the metadata table - jobStatement := fmt.Sprintf( - "INSERT INTO %s (mirror_job_name,offset,sync_batch_id) VALUES ('%s',%d,%d);", - MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID) - if hasJob { - jobStatement = fmt.Sprintf( - "UPDATE %s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';", - MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName) - } - - return jobStatement, nil -} - -// metadataHasJob checks if the metadata table has the given job. -func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) { - checkStmt := fmt.Sprintf( - "SELECT COUNT(*) FROM %s WHERE mirror_job_name = '%s'", - MirrorJobsTable, jobName) - - q := c.client.Query(checkStmt) - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - it, err := q.Read(c.ctx) - if err != nil { - return false, fmt.Errorf("failed to check if job exists: %w", err) - } - - var row []bigquery.Value - err = it.Next(&row) - if err != nil { - return false, fmt.Errorf("failed read row: %w", err) - } - - count, ok := row[0].(int64) - if !ok { - return false, fmt.Errorf("failed to convert count to int64") - } - - return count > 0, nil -} - // SetupNormalizedTables sets up normalized tables, implementing the Connector interface. // This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided. func (c *BigQueryConnector) SetupNormalizedTables( @@ -911,22 +731,18 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return fmt.Errorf("unable to clear metadata for sync flow cleanup: %w", err) + } + dataset := c.client.DatasetInProject(c.projectID, c.datasetID) // deleting PeerDB specific tables - err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } - // deleting job from metadata table - query := fmt.Sprintf("DELETE FROM %s WHERE mirror_job_name = '%s'", MirrorJobsTable, jobName) - queryHandler := c.client.Query(query) - queryHandler.DefaultProjectID = c.projectID - queryHandler.DefaultDatasetID = c.datasetID - _, err = queryHandler.Read(c.ctx) - if err != nil { - return fmt.Errorf("failed to delete job from metadata table: %w", err) - } return nil } diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index c0aafe045f..8978f82a11 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -3,13 +3,9 @@ package connbigquery import ( "fmt" "log/slog" - "reflect" "strings" - "time" "cloud.google.com/go/bigquery" - "google.golang.org/api/iterator" - "google.golang.org/protobuf/encoding/protojson" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -32,7 +28,7 @@ func (c *BigQueryConnector) SyncQRepRecords( return 0, err } - done, err := c.isPartitionSynced(partition.PartitionId) + done, err := c.pgMetadata.IsQrepPartitionSynced(config.FlowJobName, partition.PartitionId) if err != nil { return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) } @@ -97,69 +93,12 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfi return dstTableMetadata, nil } -func (c *BigQueryConnector) createMetadataInsertStatement( - partition *protos.QRepPartition, - jobName string, - startTime time.Time, -) (string, error) { - // marshal the partition to json using protojson - pbytes, err := protojson.Marshal(partition) - if err != nil { - return "", fmt.Errorf("failed to marshal partition to json: %v", err) - } - - // convert the bytes to string - partitionJSON := string(pbytes) - - insertMetadataStmt := fmt.Sprintf( - "INSERT INTO _peerdb_query_replication_metadata"+ - "(flowJobName, partitionID, syncPartition, syncStartTime, syncFinishTime) "+ - "VALUES ('%s', '%s', JSON '%s', TIMESTAMP('%s'), CURRENT_TIMESTAMP());", - jobName, partition.PartitionId, - partitionJSON, startTime.Format(time.RFC3339)) - - return insertMetadataStmt, nil -} - func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { - qRepMetadataTableName := "_peerdb_query_replication_metadata" - - // define the schema - qRepMetadataSchema := bigquery.Schema{ - {Name: "flowJobName", Type: bigquery.StringFieldType}, - {Name: "partitionID", Type: bigquery.StringFieldType}, - {Name: "syncPartition", Type: bigquery.JSONFieldType}, - {Name: "syncStartTime", Type: bigquery.TimestampFieldType}, - {Name: "syncFinishTime", Type: bigquery.TimestampFieldType}, - } - - // reference the table - table := c.client.DatasetInProject(c.projectID, c.datasetID).Table(qRepMetadataTableName) - - // check if the table exists - meta, err := table.Metadata(c.ctx) - if err == nil { - // table exists, check if the schema matches - if !reflect.DeepEqual(meta.Schema, qRepMetadataSchema) { - return fmt.Errorf("table %s.%s already exists with different schema", c.datasetID, qRepMetadataTableName) - } else { - return nil - } - } - - // table does not exist, create it - err = table.Create(c.ctx, &bigquery.TableMetadata{ - Schema: qRepMetadataSchema, - }) - if err != nil { - return fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, qRepMetadataTableName, err) - } - if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { query := c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) query.DefaultDatasetID = c.datasetID query.DefaultProjectID = c.projectID - _, err = query.Read(c.ctx) + _, err := query.Read(c.ctx) if err != nil { return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) } @@ -167,38 +106,3 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e return nil } - -func (c *BigQueryConnector) isPartitionSynced(partitionID string) (bool, error) { - queryString := fmt.Sprintf( - "SELECT COUNT(*) FROM _peerdb_query_replication_metadata WHERE partitionID = '%s';", - partitionID, - ) - - query := c.client.Query(queryString) - query.DefaultDatasetID = c.datasetID - query.DefaultProjectID = c.projectID - it, err := query.Read(c.ctx) - if err != nil { - return false, fmt.Errorf("failed to execute query: %w", err) - } - - var values []bigquery.Value - err = it.Next(&values) - if err == iterator.Done { - return false, nil - } - if err != nil { - return false, fmt.Errorf("failed to iterate query results: %w", err) - } - - if len(values) != 1 { - return false, fmt.Errorf("expected 1 value, got %d", len(values)) - } - - count, ok := values[0].(int64) - if !ok { - return false, fmt.Errorf("failed to convert %v to int64", reflect.TypeOf(values[0])) - } - - return count > 0, nil -} diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index c8b182706f..894475c6b7 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -63,7 +63,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( table: stagingTable, }, stream, req.FlowJobName) if err != nil { - return nil, fmt.Errorf("failed to push to avro stage: %v", err) + return nil, fmt.Errorf("failed to push to avro stage: %w", err) } bqClient := s.connector.client @@ -73,11 +73,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( lastCP, err := req.Records.GetLastCheckpoint() if err != nil { - return nil, fmt.Errorf("failed to get last checkpoint: %v", err) - } - updateMetadataStmt, err := s.connector.getUpdateMetadataStmt(req.FlowJobName, lastCP, syncBatchID) - if err != nil { - return nil, fmt.Errorf("failed to update metadata: %v", err) + return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } activity.RecordHeartbeat(s.connector.ctx, @@ -91,18 +87,17 @@ func (s *QRepAvroSyncMethod) SyncRecords( return nil, fmt.Errorf("failed to sync schema changes: %w", err) } - stmts := []string{ - "BEGIN TRANSACTION;", - insertStmt, - updateMetadataStmt, - "COMMIT TRANSACTION;", - } - query := bqClient.Query(strings.Join(stmts, "\n")) + query := bqClient.Query(insertStmt) query.DefaultDatasetID = s.connector.datasetID query.DefaultProjectID = s.connector.projectID _, err = query.Read(s.connector.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err) + return nil, fmt.Errorf("failed to execute statements in a transaction: %w", err) + } + + err = s.connector.pgMetadata.FinishBatch(req.FlowJobName, syncBatchID, lastCP) + if err != nil { + return nil, fmt.Errorf("failed to update metadata: %w", err) } // drop the staging table @@ -180,7 +175,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingDatasetTable, stream, flowJobName) if err != nil { - return -1, fmt.Errorf("failed to push to avro stage: %v", err) + return -1, fmt.Errorf("failed to push to avro stage: %w", err) } activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf( "Flow job %s: running insert-into-select transaction for"+ @@ -202,24 +197,19 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;", dstTableName, selector, stagingDatasetTable.string()) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) - if err != nil { - return -1, fmt.Errorf("failed to create metadata insert statement: %v", err) - } slog.Info("Performing transaction inside QRep sync function", flowLog) - stmts := []string{ - "BEGIN TRANSACTION;", - insertStmt, - insertMetadataStmt, - "COMMIT TRANSACTION;", - } - query := bqClient.Query(strings.Join(stmts, "\n")) + query := bqClient.Query(insertStmt) query.DefaultDatasetID = s.connector.datasetID query.DefaultProjectID = s.connector.projectID _, err = query.Read(s.connector.ctx) if err != nil { - return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) + return -1, fmt.Errorf("failed to execute statements in a transaction: %w", err) + } + + err = s.connector.pgMetadata.FinishQrepPartition(partition, flowJobName, startTime) + if err != nil { + return -1, err } // drop the staging table @@ -283,7 +273,7 @@ func DefineAvroSchema(dstTableName string, avroSchemaJSON, err := json.Marshal(avroSchema) if err != nil { - return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err) + return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %w", err) } return &model.QRecordAvroSchemaDefinition{ diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index d70a851e44..099bd97b51 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -41,6 +41,15 @@ func NewPostgresMetadataStore(ctx context.Context) (*PostgresMetadataStore, erro }, nil } +func NewPostgresMetadataStoreFromCatalog(ctx context.Context, pool *pgxpool.Pool) *PostgresMetadataStore { + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + return &PostgresMetadataStore{ + ctx: ctx, + pool: pool, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + } +} + func (p *PostgresMetadataStore) Ping() error { pingErr := p.pool.Ping(p.ctx) if pingErr != nil {