Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake: stop storing metadata on warehouse; store in catalog #1179

Merged
merged 7 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ import (
)

const (
/*
Different batch Ids in code/BigQuery
1. batchID - identifier in raw table on target to depict which batch a row was inserted.
3. syncBatchID - batch id that was last synced or will be synced
4. normalizeBatchID - batch id that was last normalized or will be normalized.
*/
// MirrorJobsTable has the following schema:
// CREATE TABLE peerdb_mirror_jobs (
// mirror_job_id STRING NOT NULL,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ func (s *ClickhouseAvroSyncMethod) insertMetadata(
if err != nil {
s.connector.logger.Error("failed to create metadata insert statement",
slog.Any("error", err), partitionLog)
return fmt.Errorf("failed to create metadata insert statement: %v", err)
return fmt.Errorf("failed to create metadata insert statement: %w", err)
}

if _, err := s.connector.database.Exec(insertMetadataStmt); err != nil {
return fmt.Errorf("failed to execute metadata insert statement: %v", err)
return fmt.Errorf("failed to execute metadata insert statement: %w", err)
}

return nil
Expand Down
110 changes: 82 additions & 28 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"google.golang.org/protobuf/encoding/protojson"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand All @@ -18,6 +20,7 @@ import (

const (
lastSyncStateTableName = "last_sync_state"
qrepTableName = "qrep_metadata"
)

type Querier interface {
Expand Down Expand Up @@ -118,29 +121,41 @@ func (p *PostgresMetadataStore) SetupMetadata() error {

// create the last sync state table
_, err = p.conn.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` (
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+`(
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sync_batch_id BIGINT NOT NULL,
normalize_batch_id BIGINT
)
`)
)`)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create last sync state table", slog.Any("error", err))
return err
}

_, err = p.conn.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(qrepTableName)+`(
job_name TEXT NOT NULL,
partition_id TEXT NOT NULL,
sync_partition JSON NOT NULL,
sync_start_time TIMESTAMPTZ NOT NULL,
sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY(job_name, partition_id)
)`)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create qrep metadata table", slog.Any("error", err))
return err
}

p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName))
return nil
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
row := p.conn.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
row := p.conn.QueryRow(p.ctx,
`SELECT last_offset FROM `+
p.QualifyTable(lastSyncStateTableName)+
` WHERE job_name = $1`, jobName)
var offset pgtype.Int8
err := row.Scan(&offset)
if err != nil {
Expand All @@ -158,11 +173,10 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
row := p.conn.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
row := p.conn.QueryRow(p.ctx,
`SELECT sync_batch_id FROM `+
p.QualifyTable(lastSyncStateTableName)+
` WHERE job_name = $1`, jobName)

var syncBatchID pgtype.Int8
err := row.Scan(&syncBatchID)
Expand All @@ -181,11 +195,10 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) {
rows := p.conn.QueryRow(p.ctx, `
SELECT normalize_batch_id
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
rows := p.conn.QueryRow(p.ctx,
`SELECT normalize_batch_id FROM `+
p.QualifyTable(lastSyncStateTableName)+
` WHERE job_name = $1`, jobName)

var normalizeBatchID pgtype.Int8
err := rows.Scan(&normalizeBatchID)
Expand Down Expand Up @@ -242,10 +255,9 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o

func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error {
p.logger.Info("updating normalize batch id for job")
_, err := p.conn.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET normalize_batch_id=$2 WHERE job_name=$1
`, jobName, batchID)
_, err := p.conn.Exec(p.ctx,
`UPDATE `+p.QualifyTable(lastSyncStateTableName)+
` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID)
if err != nil {
p.logger.Error("failed to update normalize batch id", slog.Any("error", err))
return err
Expand All @@ -254,10 +266,52 @@ func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID i
return nil
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.conn.Exec(p.ctx, `
DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
func (p *PostgresMetadataStore) FinishQrepPartition(
partition *protos.QRepPartition,
jobName string,
startTime time.Time,
) error {
pbytes, err := protojson.Marshal(partition)
if err != nil {
return fmt.Errorf("failed to marshal partition to json: %w", err)
}
partitionJSON := string(pbytes)

_, err = p.conn.Exec(p.ctx,
`INSERT INTO `+p.QualifyTable(qrepTableName)+
`(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)
ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`,
jobName, partition.PartitionId, partitionJSON, startTime)
return err
}

func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) {
var exists bool
err := p.conn.QueryRow(p.ctx,
`SELECT EXISTS(SELECT * FROM `+
p.QualifyTable(qrepTableName)+
` WHERE job_name = $1 AND partition_id = $2)`,
jobName, partitionID).Scan(&exists)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
return exists, nil
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.conn.Exec(p.ctx,
`DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+
` WHERE job_name = $1`, jobName)
if err != nil {
return err
}

_, err = p.conn.Exec(p.ctx,
`DELETE FROM `+p.QualifyTable(qrepTableName)+
` WHERE job_name = $1`, jobName)
if err != nil {
return err
}

return nil
}
17 changes: 2 additions & 15 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,7 @@ func (c *S3Connector) NeedsSetupMetadataTables() bool {
}

func (c *S3Connector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

return nil
return c.pgMetadata.SetupMetadata()
}

func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) {
Expand All @@ -172,15 +166,8 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

// update offset for a job
func (c *S3Connector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
return err
}

return nil
return c.pgMetadata.UpdateLastOffset(jobName, offset)
}

func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
Expand Down
99 changes: 3 additions & 96 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,17 @@ import (
"fmt"
"log/slog"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/jackc/pgx/v5/pgtype"
"google.golang.org/protobuf/encoding/protojson"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
)

const qRepMetadataTableName = "_peerdb_query_replication_metadata"

func (c *SnowflakeConnector) SyncQRepRecords(
config *protos.QRepConfig,
partition *protos.QRepPartition,
Expand All @@ -37,7 +33,7 @@ func (c *SnowflakeConnector) SyncQRepRecords(
}
c.logger.Info("Called QRep sync function and obtained table schema", flowLog)

done, err := c.isPartitionSynced(partition.PartitionId)
done, err := c.pgMetadata.IsQrepPartitionSynced(config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}
Expand All @@ -51,30 +47,6 @@ func (c *SnowflakeConnector) SyncQRepRecords(
return avroSync.SyncQRepRecords(config, partition, tblSchema, stream)
}

func (c *SnowflakeConnector) createMetadataInsertStatement(
partition *protos.QRepPartition,
jobName string,
startTime time.Time,
) (string, error) {
// marshal the partition to json using protojson
pbytes, err := protojson.Marshal(partition)
if err != nil {
return "", fmt.Errorf("failed to marshal partition to json: %v", err)
}

// convert the bytes to string
partitionJSON := string(pbytes)

insertMetadataStmt := fmt.Sprintf(
`INSERT INTO %s.%s
(flowJobName, partitionID, syncPartition, syncStartTime, syncFinishTime)
VALUES ('%s', '%s', '%s', '%s'::timestamp, CURRENT_TIMESTAMP);`,
c.metadataSchema, qRepMetadataTableName, jobName, partition.PartitionId,
partitionJSON, startTime.Format(time.RFC3339))

return insertMetadataStmt, nil
}

func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) {
schematable, err := utils.ParseSchemaTable(tableName)
if err != nil {
Expand All @@ -99,49 +71,13 @@ func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType
return columnTypes, nil
}

func (c *SnowflakeConnector) isPartitionSynced(partitionID string) (bool, error) {
//nolint:gosec
queryString := fmt.Sprintf(`
SELECT COUNT(*)
FROM %s.%s
WHERE partitionID = '%s'
`, c.metadataSchema, qRepMetadataTableName, partitionID)

row := c.database.QueryRow(queryString)

var count pgtype.Int8
if err := row.Scan(&count); err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}

return count.Int64 > 0, nil
}

func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
// NOTE that Snowflake does not support transactional DDL
createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err)
}
// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := createMetadataTablesTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
c.logger.Error("error while rolling back transaction for creating metadata tables",
slog.Any("error", deferErr))
}
}()
err = c.createPeerDBInternalSchema(createMetadataTablesTx)
if err != nil {
return err
}
err = c.createQRepMetadataTable(createMetadataTablesTx)
_, err := c.database.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.rawSchema))
if err != nil {
return err
}

stageName := c.getStageNameForJob(config.FlowJobName)

err = c.createStage(stageName, config)
if err != nil {
return err
Expand All @@ -154,35 +90,6 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
}
}

err = createMetadataTablesTx.Commit()
if err != nil {
return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err)
}

return nil
}

func (c *SnowflakeConnector) createQRepMetadataTable(createMetadataTableTx *sql.Tx) error {
// Define the schema
schemaStatement := `
CREATE TABLE IF NOT EXISTS %s.%s (
flowJobName STRING,
partitionID STRING,
syncPartition STRING,
syncStartTime TIMESTAMP_LTZ,
syncFinishTime TIMESTAMP_LTZ
);
`
queryString := fmt.Sprintf(schemaStatement, c.metadataSchema, qRepMetadataTableName)

_, err := createMetadataTableTx.Exec(queryString)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to create table %s.%s", c.metadataSchema, qRepMetadataTableName),
slog.Any("error", err))
return fmt.Errorf("failed to create table %s.%s: %w", c.metadataSchema, qRepMetadataTableName, err)
}

c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName))
return nil
}

Expand Down Expand Up @@ -371,5 +278,5 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error {
}

func (c *SnowflakeConnector) getStageNameForJob(job string) string {
return fmt.Sprintf("%s.peerdb_stage_%s", c.metadataSchema, job)
return fmt.Sprintf("%s.peerdb_stage_%s", c.rawSchema, job)
}
Loading
Loading