Skip to content

Commit

Permalink
sf, s3, sqlserver, snapshot_flow, utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 8, 2023
1 parent b0d6bff commit 1dbd73d
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 278 deletions.
5 changes: 2 additions & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import (
"context"
"errors"
"fmt"

log "github.com/sirupsen/logrus"
"log/slog"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
Expand Down Expand Up @@ -264,6 +263,6 @@ func CloseConnector(conn Connector) {

err := conn.Close()
if err != nil {
log.Errorf("error closing connector: %v", err)
slog.Error("error closing connector", slog.Any("error", err))
}
}
12 changes: 6 additions & 6 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package conns3

import (
"fmt"
"log/slog"

"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
log "github.com/sirupsen/logrus"
"github.com/PeerDB-io/peer-flow/shared"
)

func (c *S3Connector) SyncQRepRecords(
Expand All @@ -18,10 +19,9 @@ func (c *S3Connector) SyncQRepRecords(
) (int, error) {
schema, err := stream.Schema()
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
"partitionID": partition.PartitionId,
}).Errorf("failed to get schema from stream: %v", err)
c.logger.Error("failed to get schema from stream",
slog.Any("error", err),
slog.String(string(shared.PartitionIdKey), partition.PartitionId))
return 0, fmt.Errorf("failed to get schema from stream: %w", err)
}

Expand Down Expand Up @@ -75,6 +75,6 @@ func (c *S3Connector) writeToAvroFile(

// S3 just sets up destination, not metadata tables
func (c *S3Connector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
log.Infof("QRep metadata setup not needed for S3.")
c.logger.Info("QRep metadata setup not needed for S3.")
return nil
}
29 changes: 16 additions & 13 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package conns3
import (
"context"
"fmt"
"log/slog"
"strings"
"time"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
log "github.com/sirupsen/logrus"
)

const (
Expand All @@ -25,6 +26,7 @@ type S3Connector struct {
pgMetadata *metadataStore.PostgresMetadataStore
client s3.S3
creds utils.S3PeerCredentials
logger slog.Logger
}

func NewS3Connector(ctx context.Context,
Expand Down Expand Up @@ -64,31 +66,32 @@ func NewS3Connector(ctx context.Context,
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx,
config.GetMetadataDb(), metadataSchemaName)
if err != nil {
log.Errorf("failed to create postgres metadata store: %v", err)
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &S3Connector{
ctx: ctx,
url: config.Url,
pgMetadata: pgMetadata,
client: *s3Client,
creds: s3PeerCreds,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
}, nil
}

func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
log.Infof("CreateRawTable for S3 is a no-op")
c.logger.Info("CreateRawTable for S3 is a no-op")
return nil, nil
}

func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
log.Infof("InitializeTableSchema for S3 is a no-op")
c.logger.Info("InitializeTableSchema for S3 is a no-op")
return nil
}

func (c *S3Connector) Close() error {
log.Debugf("Closing metadata store connection")
c.logger.Debug("Closing metadata store connection")
return c.pgMetadata.Close()
}

Expand Down Expand Up @@ -142,7 +145,7 @@ func (c *S3Connector) ConnectionActive() error {

validErr := ValidCheck(&c.client, c.url, c.pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
c.logger.Error("failed to validate s3 connector:", slog.Any("error", validErr))
return validErr
}

Expand All @@ -156,7 +159,7 @@ func (c *S3Connector) NeedsSetupMetadataTables() bool {
func (c *S3Connector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
log.Errorf("failed to setup metadata tables: %v", err)
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

Expand Down Expand Up @@ -185,7 +188,7 @@ func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, erro
func (c *S3Connector) updateLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
log.Errorf("failed to update last offset: %v", err)
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
return err
}

Expand Down Expand Up @@ -217,7 +220,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
if err != nil {
return nil, err
}
log.Infof("Synced %d records", numRecords)
c.logger.Info(fmt.Sprintf("Synced %d records", numRecords))

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
Expand All @@ -226,12 +229,12 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes

err = c.updateLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
log.Errorf("failed to update last offset for s3 cdc: %v", err)
c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err))
return nil, err
}
err = c.pgMetadata.IncrementID(req.FlowJobName)
if err != nil {
log.Errorf("%v", err)
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
}

Expand All @@ -246,7 +249,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatchInput) (
*protos.SetupNormalizedTableBatchOutput,
error) {
log.Infof("SetupNormalizedTables for S3 is a no-op")
c.logger.Info("SetupNormalizedTables for S3 is a no-op")
return nil, nil
}

Expand Down
77 changes: 27 additions & 50 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package connsnowflake
import (
"database/sql"
"fmt"
"log/slog"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protojson"
)

Expand All @@ -25,27 +26,23 @@ func (c *SnowflakeConnector) SyncQRepRecords(
) (int, error) {
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier

flowLog := slog.Group("sync_metadata",
slog.String(string(shared.PartitionIdKey), partition.PartitionId),
slog.String("destinationTable", destTable),
)
tblSchema, err := c.getTableSchema(destTable)
if err != nil {
return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err)
}
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
"partition": partition.PartitionId,
}).Infof("Called QRep sync function and "+
"obtained table schema for destination table %s",
destTable)
c.logger.Info("Called QRep sync function and obtained table schema", flowLog)

done, err := c.isPartitionSynced(partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("Partition %s has already been synced", partition.PartitionId)
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

Expand Down Expand Up @@ -127,7 +124,8 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
defer func() {
deferErr := createMetadataTablesTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.Errorf("unexpected error while rolling back transaction for creating metadata tables: %v", deferErr)
c.logger.Error("error while rolling back transaction for creating metadata tables",
slog.Any("error", deferErr))
}
}()
err = c.createPeerDBInternalSchema(createMetadataTablesTx)
Expand Down Expand Up @@ -176,11 +174,12 @@ func (c *SnowflakeConnector) createQRepMetadataTable(createMetadataTableTx *sql.

_, err := createMetadataTableTx.Exec(queryString)
if err != nil {
log.Errorf("failed to create table %s.%s: %v", c.metadataSchema, qRepMetadataTableName, err)
c.logger.Error(fmt.Sprintf("failed to create table %s.%s", c.metadataSchema, qRepMetadataTableName),
slog.Any("error", err))
return fmt.Errorf("failed to create table %s.%s: %w", c.metadataSchema, qRepMetadataTableName, err)
}

log.Infof("Created table %s", qRepMetadataTableName)
c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName))
return nil
}

Expand All @@ -203,32 +202,24 @@ func (c *SnowflakeConnector) createStage(stageName string, config *protos.QRepCo
// Execute the query
_, err := c.database.Exec(createStageStmt)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to create stage %s: %v", stageName, err)
c.logger.Error(fmt.Sprintf("failed to create stage %s", stageName), slog.Any("error", err))
return fmt.Errorf("failed to create stage %s: %w", stageName, err)
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("Created stage %s", stageName)
c.logger.Info(fmt.Sprintf("Created stage %s", stageName))
return nil
}

func (c *SnowflakeConnector) createExternalStage(stageName string, config *protos.QRepConfig) (string, error) {
awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{})
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to get AWS secrets: %v", err)
c.logger.Error("failed to get AWS secrets", slog.Any("error", err))
return "", fmt.Errorf("failed to get AWS secrets: %w", err)
}

s3o, err := utils.NewS3BucketAndPrefix(config.StagingPath)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to extract S3 bucket and prefix: %v", err)
c.logger.Error("failed to extract S3 bucket and prefix", slog.Any("error", err))
return "", fmt.Errorf("failed to extract S3 bucket and prefix: %w", err)
}

Expand Down Expand Up @@ -256,25 +247,21 @@ func (c *SnowflakeConnector) createExternalStage(stageName string, config *proto
}

func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig) error {
log.Infof("Consolidating partitions for flow job %s", config.FlowJobName)
c.logger.Error("Consolidating partitions")

destTable := config.DestinationTableIdentifier
stageName := c.getStageNameForJob(config.FlowJobName)

colInfo, err := c.getColsFromTable(destTable)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to get columns from table %s: %v", destTable, err)
c.logger.Error(fmt.Sprintf("failed to get columns from table %s", destTable), slog.Any("error", err))
return fmt.Errorf("failed to get columns from table %s: %w", destTable, err)
}

allCols := colInfo.Columns
err = CopyStageToDestination(c, config, destTable, stageName, allCols)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to copy stage to destination: %v", err)
c.logger.Error("failed to copy stage to destination", slog.Any("error", err))
return fmt.Errorf("failed to copy stage to destination: %w", err)
}

Expand All @@ -283,9 +270,7 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig

// CleanupQRepFlow function for snowflake connector
func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("Cleaning up flow job %s", config.FlowJobName)
c.logger.Error("Cleaning up flow job")
return c.dropStage(config.StagingPath, config.FlowJobName)
}

Expand Down Expand Up @@ -347,20 +332,16 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error {
if strings.HasPrefix(stagingPath, "s3://") {
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
log.WithFields(log.Fields{
"flowName": job,
}).Errorf("failed to create S3 bucket and prefix: %v", err)
c.logger.Error("failed to create S3 bucket and prefix", slog.Any("error", err))
return fmt.Errorf("failed to create S3 bucket and prefix: %w", err)
}

log.Infof("Deleting contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job)
c.logger.Info(fmt.Sprintf("Deleting contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job))

// deleting the contents of the bucket with prefix
s3svc, err := utils.CreateS3Client(utils.S3PeerCredentials{})
if err != nil {
log.WithFields(log.Fields{
"flowName": job,
}).Errorf("failed to create S3 client: %v", err)
c.logger.Error("failed to create S3 client", slog.Any("error", err))
return fmt.Errorf("failed to create S3 client: %w", err)
}

Expand All @@ -373,18 +354,14 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error {
// Iterate through the objects in the bucket with the prefix and delete them
s3Client := s3manager.NewBatchDeleteWithClient(s3svc)
if err := s3Client.Delete(aws.BackgroundContext(), iter); err != nil {
log.WithFields(log.Fields{
"flowName": job,
}).Errorf("failed to delete objects from bucket: %v", err)
c.logger.Error("failed to delete objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to delete objects from bucket: %w", err)
}

log.Infof("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job)
c.logger.Info(fmt.Sprintf("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job))
}

log.WithFields(log.Fields{
"flowName": job,
}).Infof("Dropped stage %s", stageName)
c.logger.Info(fmt.Sprintf("Dropped stage %s", stageName))
return nil
}

Expand Down
Loading

0 comments on commit 1dbd73d

Please sign in to comment.