Skip to content

Commit

Permalink
BigQuery: stop storing metadata on warehouse; store in catalog (#1191)
Browse files Browse the repository at this point in the history
Followup to #1179
  • Loading branch information
serprex authored and pankaj-peerdb committed Feb 6, 2024
1 parent 5f1b677 commit de6d4a0
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 331 deletions.
226 changes: 21 additions & 205 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/api/iterator"
"google.golang.org/api/option"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -27,14 +28,6 @@ import (
)

const (
// MirrorJobsTable has the following schema:
// CREATE TABLE peerdb_mirror_jobs (
// mirror_job_id STRING NOT NULL,
// offset INTEGER NOT NULL,
// sync_batch_id INTEGER NOT NULL
// normalize_batch_id INTEGER
// )
MirrorJobsTable = "peerdb_mirror_jobs"
SyncRecordsBatchSize = 1024
)

Expand All @@ -57,6 +50,7 @@ type BigQueryConnector struct {
bqConfig *protos.BigqueryConfig
client *bigquery.Client
storageClient *storage.Client
pgMetadata *metadataStore.PostgresMetadataStore
datasetID string
projectID string
catalogPool *pgxpool.Pool
Expand Down Expand Up @@ -236,6 +230,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
client: client,
datasetID: datasetID,
projectID: projectID,
pgMetadata: metadataStore.NewPostgresMetadataStoreFromCatalog(ctx, catalogPool),
storageClient: storageClient,
catalogPool: catalogPool,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
Expand Down Expand Up @@ -263,10 +258,8 @@ func (c *BigQueryConnector) ConnectionActive() error {
return nil
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
func (c *BigQueryConnector) NeedsSetupMetadataTables() bool {
_, err := c.client.DatasetInProject(c.projectID, c.datasetID).Table(MirrorJobsTable).Metadata(c.ctx)
return err != nil
return false
}

func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error {
Expand Down Expand Up @@ -324,135 +317,24 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
return nil
}

// SetupMetadataTables sets up the metadata tables.
func (c *BigQueryConnector) SetupMetadataTables() error {
// check if the dataset exists
dataset := c.client.DatasetInProject(c.projectID, c.datasetID)
if _, err := dataset.Metadata(c.ctx); err != nil {
// create the dataset as it doesn't exist
if err := dataset.Create(c.ctx, nil); err != nil {
return fmt.Errorf("failed to create dataset %s: %w", c.datasetID, err)
}
}

// Create the mirror jobs table, NeedsSetupMetadataTables ensures it doesn't exist.
mirrorJobsTable := dataset.Table(MirrorJobsTable)
mirrorJobsTableMetadata := &bigquery.TableMetadata{
Schema: bigquery.Schema{
{Name: "mirror_job_name", Type: bigquery.StringFieldType},
{Name: "offset", Type: bigquery.IntegerFieldType},
{Name: "sync_batch_id", Type: bigquery.IntegerFieldType},
{Name: "normalize_batch_id", Type: bigquery.IntegerFieldType},
},
}
if err := mirrorJobsTable.Create(c.ctx, mirrorJobsTableMetadata); err != nil {
// if the table already exists, ignore the error
if !strings.Contains(err.Error(), "Already Exists") {
return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err)
} else {
c.logger.Info(fmt.Sprintf("table %s already exists", MirrorJobsTable))
}
}

return nil
}

func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT offset FROM %s WHERE mirror_job_name = '%s'", MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found, returning nil")
return 0, nil
}

if row[0] == nil {
c.logger.Info("no offset found, returning nil")
return 0, nil
} else {
return row[0].(int64), nil
}
return c.pgMetadata.FetchLastOffset(jobName)
}

func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error {
query := fmt.Sprintf(
"UPDATE %s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'",
MirrorJobsTable,
lastOffset,
jobName,
)
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
}

return nil
func (c *BigQueryConnector) SetLastOffset(jobName string, offset int64) error {
return c.pgMetadata.UpdateLastOffset(jobName, offset)
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s WHERE mirror_job_name = '%s'",
MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DisableQueryCache = true
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return -1, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found")
return 0, nil
}

if row[0] == nil {
c.logger.Info("no sync_batch_id found, returning 0")
return 0, nil
} else {
return row[0].(int64), nil
}
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s WHERE mirror_job_name = '%s'",
MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DisableQueryCache = true
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found for job")
return 0, nil
}

if row[0] != nil {
return row[0].(int64), nil
}
return 0, nil
return c.pgMetadata.GetLastNormalizeBatchID(jobName)
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
Expand Down Expand Up @@ -591,20 +473,15 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}

hasJob, err := c.metadataHasJob(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to check if job exists: %w", err)
}
// if job is not yet found in the peerdb_mirror_jobs_table
// OR sync is lagging end normalize
if !hasJob || normBatchID >= req.SyncBatchID {
c.logger.Info("waiting for sync to catch up, so finishing")
// normalize has caught up with sync, chill until more records are loaded.
if normBatchID >= req.SyncBatchID {
return &model.NormalizeResponse{
Done: false,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}

distinctTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
req.SyncBatchID,
Expand Down Expand Up @@ -662,17 +539,10 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
}
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
MirrorJobsTable, req.SyncBatchID, req.FlowJobName)

query := c.client.Query(updateMetadataStmt)
query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
_, err = query.Read(c.ctx)

err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, req.SyncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to execute update metadata statements %s: %v", updateMetadataStmt, err)
return nil, err
}

return &model.NormalizeResponse{
Expand Down Expand Up @@ -754,56 +624,6 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
}, nil
}

// getUpdateMetadataStmt updates the metadata tables for a given job.
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64,
batchID int64,
) (string, error) {
hasJob, err := c.metadataHasJob(jobName)
if err != nil {
return "", fmt.Errorf("failed to check if job exists: %w", err)
}

// create the job in the metadata table
jobStatement := fmt.Sprintf(
"INSERT INTO %s (mirror_job_name,offset,sync_batch_id) VALUES ('%s',%d,%d);",
MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID)
if hasJob {
jobStatement = fmt.Sprintf(
"UPDATE %s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';",
MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName)
}

return jobStatement, nil
}

// metadataHasJob checks if the metadata table has the given job.
func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) {
checkStmt := fmt.Sprintf(
"SELECT COUNT(*) FROM %s WHERE mirror_job_name = '%s'",
MirrorJobsTable, jobName)

q := c.client.Query(checkStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
return false, fmt.Errorf("failed to check if job exists: %w", err)
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
return false, fmt.Errorf("failed read row: %w", err)
}

count, ok := row[0].(int64)
if !ok {
return false, fmt.Errorf("failed to convert count to int64")
}

return count > 0, nil
}

// SetupNormalizedTables sets up normalized tables, implementing the Connector interface.
// This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided.
func (c *BigQueryConnector) SetupNormalizedTables(
Expand Down Expand Up @@ -911,22 +731,18 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return fmt.Errorf("unable to clear metadata for sync flow cleanup: %w", err)
}

dataset := c.client.DatasetInProject(c.projectID, c.datasetID)
// deleting PeerDB specific tables
err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
}

// deleting job from metadata table
query := fmt.Sprintf("DELETE FROM %s WHERE mirror_job_name = '%s'", MirrorJobsTable, jobName)
queryHandler := c.client.Query(query)
queryHandler.DefaultProjectID = c.projectID
queryHandler.DefaultDatasetID = c.datasetID
_, err = queryHandler.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete job from metadata table: %w", err)
}
return nil
}

Expand Down
Loading

0 comments on commit de6d4a0

Please sign in to comment.