Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: stop storing metadata on warehouse; store in catalog #1191

Merged
merged 9 commits into from
Feb 1, 2024
232 changes: 21 additions & 211 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/api/iterator"
"google.golang.org/api/option"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -27,20 +28,6 @@ import (
)

const (
/*
Different batch Ids in code/BigQuery
1. batchID - identifier in raw table on target to depict which batch a row was inserted.
3. syncBatchID - batch id that was last synced or will be synced
4. normalizeBatchID - batch id that was last normalized or will be normalized.
*/
// MirrorJobsTable has the following schema:
// CREATE TABLE peerdb_mirror_jobs (
// mirror_job_id STRING NOT NULL,
// offset INTEGER NOT NULL,
// sync_batch_id INTEGER NOT NULL
// normalize_batch_id INTEGER
// )
MirrorJobsTable = "peerdb_mirror_jobs"
SyncRecordsBatchSize = 1024
)

Expand All @@ -63,6 +50,7 @@ type BigQueryConnector struct {
bqConfig *protos.BigqueryConfig
client *bigquery.Client
storageClient *storage.Client
pgMetadata *metadataStore.PostgresMetadataStore
datasetID string
projectID string
catalogPool *pgxpool.Pool
Expand Down Expand Up @@ -242,6 +230,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
client: client,
datasetID: datasetID,
projectID: projectID,
pgMetadata: metadataStore.NewPostgresMetadataStoreFromCatalog(ctx, catalogPool),
storageClient: storageClient,
catalogPool: catalogPool,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
Expand Down Expand Up @@ -269,10 +258,8 @@ func (c *BigQueryConnector) ConnectionActive() error {
return nil
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
func (c *BigQueryConnector) NeedsSetupMetadataTables() bool {
_, err := c.client.DatasetInProject(c.projectID, c.datasetID).Table(MirrorJobsTable).Metadata(c.ctx)
return err != nil
return false
}

func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error {
Expand Down Expand Up @@ -330,135 +317,24 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
return nil
}

// SetupMetadataTables sets up the metadata tables.
func (c *BigQueryConnector) SetupMetadataTables() error {
// check if the dataset exists
dataset := c.client.DatasetInProject(c.projectID, c.datasetID)
if _, err := dataset.Metadata(c.ctx); err != nil {
// create the dataset as it doesn't exist
if err := dataset.Create(c.ctx, nil); err != nil {
return fmt.Errorf("failed to create dataset %s: %w", c.datasetID, err)
}
}

// Create the mirror jobs table, NeedsSetupMetadataTables ensures it doesn't exist.
mirrorJobsTable := dataset.Table(MirrorJobsTable)
mirrorJobsTableMetadata := &bigquery.TableMetadata{
Schema: bigquery.Schema{
{Name: "mirror_job_name", Type: bigquery.StringFieldType},
{Name: "offset", Type: bigquery.IntegerFieldType},
{Name: "sync_batch_id", Type: bigquery.IntegerFieldType},
{Name: "normalize_batch_id", Type: bigquery.IntegerFieldType},
},
}
if err := mirrorJobsTable.Create(c.ctx, mirrorJobsTableMetadata); err != nil {
// if the table already exists, ignore the error
if !strings.Contains(err.Error(), "Already Exists") {
return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err)
} else {
c.logger.Info(fmt.Sprintf("table %s already exists", MirrorJobsTable))
}
}

return nil
}

func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT offset FROM %s WHERE mirror_job_name = '%s'", MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found, returning nil")
return 0, nil
}

if row[0] == nil {
c.logger.Info("no offset found, returning nil")
return 0, nil
} else {
return row[0].(int64), nil
}
return c.pgMetadata.FetchLastOffset(jobName)
}

func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error {
query := fmt.Sprintf(
"UPDATE %s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'",
MirrorJobsTable,
lastOffset,
jobName,
)
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
}

return nil
func (c *BigQueryConnector) SetLastOffset(jobName string, offset int64) error {
return c.pgMetadata.UpdateLastOffset(jobName, offset)
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s WHERE mirror_job_name = '%s'",
MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DisableQueryCache = true
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return -1, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found")
return 0, nil
}

if row[0] == nil {
c.logger.Info("no sync_batch_id found, returning 0")
return 0, nil
} else {
return row[0].(int64), nil
}
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s WHERE mirror_job_name = '%s'",
MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DisableQueryCache = true
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found for job")
return 0, nil
}

if row[0] != nil {
return row[0].(int64), nil
}
return 0, nil
return c.pgMetadata.GetLastNormalizeBatchID(jobName)
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
Expand Down Expand Up @@ -597,20 +473,15 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}

hasJob, err := c.metadataHasJob(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to check if job exists: %w", err)
}
// if job is not yet found in the peerdb_mirror_jobs_table
// OR sync is lagging end normalize
if !hasJob || normBatchID >= req.SyncBatchID {
c.logger.Info("waiting for sync to catch up, so finishing")
// normalize has caught up with sync, chill until more records are loaded.
if normBatchID >= req.SyncBatchID {
return &model.NormalizeResponse{
Done: false,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}

distinctTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
req.SyncBatchID,
Expand Down Expand Up @@ -668,17 +539,10 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
}
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
MirrorJobsTable, req.SyncBatchID, req.FlowJobName)

query := c.client.Query(updateMetadataStmt)
query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
_, err = query.Read(c.ctx)

err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, req.SyncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to execute update metadata statements %s: %v", updateMetadataStmt, err)
return nil, err
}

return &model.NormalizeResponse{
Expand Down Expand Up @@ -760,56 +624,6 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
}, nil
}

// getUpdateMetadataStmt updates the metadata tables for a given job.
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64,
batchID int64,
) (string, error) {
hasJob, err := c.metadataHasJob(jobName)
if err != nil {
return "", fmt.Errorf("failed to check if job exists: %w", err)
}

// create the job in the metadata table
jobStatement := fmt.Sprintf(
"INSERT INTO %s (mirror_job_name,offset,sync_batch_id) VALUES ('%s',%d,%d);",
MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID)
if hasJob {
jobStatement = fmt.Sprintf(
"UPDATE %s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';",
MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName)
}

return jobStatement, nil
}

// metadataHasJob checks if the metadata table has the given job.
func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) {
checkStmt := fmt.Sprintf(
"SELECT COUNT(*) FROM %s WHERE mirror_job_name = '%s'",
MirrorJobsTable, jobName)

q := c.client.Query(checkStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
return false, fmt.Errorf("failed to check if job exists: %w", err)
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
return false, fmt.Errorf("failed read row: %w", err)
}

count, ok := row[0].(int64)
if !ok {
return false, fmt.Errorf("failed to convert count to int64")
}

return count > 0, nil
}

// SetupNormalizedTables sets up normalized tables, implementing the Connector interface.
// This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided.
func (c *BigQueryConnector) SetupNormalizedTables(
Expand Down Expand Up @@ -917,22 +731,18 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return fmt.Errorf("unable to clear metadata for sync flow cleanup: %w", err)
}

dataset := c.client.DatasetInProject(c.projectID, c.datasetID)
// deleting PeerDB specific tables
err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
}

// deleting job from metadata table
query := fmt.Sprintf("DELETE FROM %s WHERE mirror_job_name = '%s'", MirrorJobsTable, jobName)
queryHandler := c.client.Query(query)
queryHandler.DefaultProjectID = c.projectID
queryHandler.DefaultDatasetID = c.datasetID
_, err = queryHandler.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete job from metadata table: %w", err)
}
return nil
}

Expand Down
Loading