Skip to content

Commit

Permalink
Add query tags for Snowflake connector to track most expensive queries (
Browse files Browse the repository at this point in the history
#1776)

This commit adds query tags to the Snowflake connector for the most
frequently executed and expensive queries that contribute significantly
to the overall cost. The tags are added using the
`withMirrorNameQueryTag` method, which sets a tag with the format
`peerdb-mirror-<flowJobName>` for each query.

Query tags have been added to key methods:
- `SyncRecords`
- `NormalizeRecords`
- `CreateRawTable`
- `SyncFlowCleanup`
- `SyncQRepRecords`
- `SetupQRepMetadataTables`
- `ConsolidateQRepPartitions`

These methods cover core functionality of syncing, normalizing, and
managing QRep tasks. The tags will help identify and track the most
expensive queries in the Snowflake web interface or using the
`QUERY_HISTORY` table.

Some minor queries have been ignored to focus on the most impactful
areas. The query tags will aid in monitoring, optimizing, and
controlling costs associated with the Snowflake connector.
  • Loading branch information
iskakaushik authored May 31, 2024
1 parent e910b25 commit 2446e8e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
6 changes: 6 additions & 0 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (c *SnowflakeConnector) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName)

// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
flowLog := slog.Group("sync_metadata",
Expand Down Expand Up @@ -71,6 +73,8 @@ func (c *SnowflakeConnector) getTableSchema(ctx context.Context, tableName strin
}

func (c *SnowflakeConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error {
ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName)

var schemaExists sql.NullBool
err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, c.rawSchema).Scan(&schemaExists)
if err != nil {
Expand Down Expand Up @@ -169,6 +173,8 @@ func (c *SnowflakeConnector) createExternalStage(ctx context.Context, stageName
}

func (c *SnowflakeConnector) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error {
ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName)

destTable := config.DestinationTableIdentifier
stageName := c.getStageNameForJob(config.FlowJobName)

Expand Down
10 changes: 10 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,13 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(
return nil
}

func (c *SnowflakeConnector) withMirrorNameQueryTag(ctx context.Context, mirrorName string) context.Context {
return gosnowflake.WithQueryTag(ctx, "peerdb-mirror-"+mirrorName)
}

func (c *SnowflakeConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName)

rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
c.logger.Info("pushing records to Snowflake table " + rawTableIdentifier)

Expand Down Expand Up @@ -468,6 +474,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(

// NormalizeRecords normalizes raw table to destination table.
func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName)
normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -583,6 +590,8 @@ func (c *SnowflakeConnector) mergeTablesForBatch(
}

func (c *SnowflakeConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName)

var schemaExists sql.NullBool
err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, c.rawSchema).Scan(&schemaExists)
if err != nil {
Expand Down Expand Up @@ -625,6 +634,7 @@ func (c *SnowflakeConnector) CreateRawTable(ctx context.Context, req *protos.Cre
}

func (c *SnowflakeConnector) SyncFlowCleanup(ctx context.Context, jobName string) error {
ctx = c.withMirrorNameQueryTag(ctx, jobName)
err := c.PostgresMetadata.SyncFlowCleanup(ctx, jobName)
if err != nil {
return fmt.Errorf("[snowflake drop mirror] unable to clear metadata for sync flow cleanup: %w", err)
Expand Down

0 comments on commit 2446e8e

Please sign in to comment.