From 2446e8ed0483f1c346fb3bcc69efe476722074cd Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 31 May 2024 16:13:11 -0400 Subject: [PATCH] Add query tags for Snowflake connector to track most expensive queries (#1776) This commit adds query tags to the Snowflake connector for the most frequently executed and expensive queries that contribute significantly to the overall cost. The tags are added using the `withMirrorNameQueryTag` method, which sets a tag with the format `peerdb-mirror-` for each query. Query tags have been added to key methods: - `SyncRecords` - `NormalizeRecords` - `CreateRawTable` - `SyncFlowCleanup` - `SyncQRepRecords` - `SetupQRepMetadataTables` - `ConsolidateQRepPartitions` These methods cover core functionality of syncing, normalizing, and managing QRep tasks. The tags will help identify and track the most expensive queries in the Snowflake web interface or using the `QUERY_HISTORY` table. Some minor queries have been ignored to focus on the most impactful areas. The query tags will aid in monitoring, optimizing, and controlling costs associated with the Snowflake connector. --- flow/connectors/snowflake/qrep.go | 6 ++++++ flow/connectors/snowflake/snowflake.go | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index daf3e330db..ee3d12c5c7 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -30,6 +30,8 @@ func (c *SnowflakeConnector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { + ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName) + // Ensure the destination table is available. destTable := config.DestinationTableIdentifier flowLog := slog.Group("sync_metadata", @@ -71,6 +73,8 @@ func (c *SnowflakeConnector) getTableSchema(ctx context.Context, tableName strin } func (c *SnowflakeConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error { + ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName) + var schemaExists sql.NullBool err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, c.rawSchema).Scan(&schemaExists) if err != nil { @@ -169,6 +173,8 @@ func (c *SnowflakeConnector) createExternalStage(ctx context.Context, stageName } func (c *SnowflakeConnector) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error { + ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName) + destTable := config.DestinationTableIdentifier stageName := c.getStageNameForJob(config.FlowJobName) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 74ce0942e0..a0887ebfdd 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -405,7 +405,13 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( return nil } +func (c *SnowflakeConnector) withMirrorNameQueryTag(ctx context.Context, mirrorName string) context.Context { + return gosnowflake.WithQueryTag(ctx, "peerdb-mirror-"+mirrorName) +} + func (c *SnowflakeConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { + ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName) + rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) c.logger.Info("pushing records to Snowflake table " + rawTableIdentifier) @@ -468,6 +474,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table. func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { + ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName) normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { return nil, err @@ -583,6 +590,8 @@ func (c *SnowflakeConnector) mergeTablesForBatch( } func (c *SnowflakeConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName) + var schemaExists sql.NullBool err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, c.rawSchema).Scan(&schemaExists) if err != nil { @@ -625,6 +634,7 @@ func (c *SnowflakeConnector) CreateRawTable(ctx context.Context, req *protos.Cre } func (c *SnowflakeConnector) SyncFlowCleanup(ctx context.Context, jobName string) error { + ctx = c.withMirrorNameQueryTag(ctx, jobName) err := c.PostgresMetadata.SyncFlowCleanup(ctx, jobName) if err != nil { return fmt.Errorf("[snowflake drop mirror] unable to clear metadata for sync flow cleanup: %w", err)