diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index daf3e330db..ee3d12c5c7 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -30,6 +30,8 @@ func (c *SnowflakeConnector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { + ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName) + // Ensure the destination table is available. destTable := config.DestinationTableIdentifier flowLog := slog.Group("sync_metadata", @@ -71,6 +73,8 @@ func (c *SnowflakeConnector) getTableSchema(ctx context.Context, tableName strin } func (c *SnowflakeConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error { + ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName) + var schemaExists sql.NullBool err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, c.rawSchema).Scan(&schemaExists) if err != nil { @@ -169,6 +173,8 @@ func (c *SnowflakeConnector) createExternalStage(ctx context.Context, stageName } func (c *SnowflakeConnector) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error { + ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName) + destTable := config.DestinationTableIdentifier stageName := c.getStageNameForJob(config.FlowJobName) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 74ce0942e0..a0887ebfdd 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -405,7 +405,13 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( return nil } +func (c *SnowflakeConnector) withMirrorNameQueryTag(ctx context.Context, mirrorName string) context.Context { + return gosnowflake.WithQueryTag(ctx, "peerdb-mirror-"+mirrorName) +} + func (c *SnowflakeConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { + ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName) + rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) c.logger.Info("pushing records to Snowflake table " + rawTableIdentifier) @@ -468,6 +474,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table. func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { + ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName) normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { return nil, err @@ -583,6 +590,8 @@ func (c *SnowflakeConnector) mergeTablesForBatch( } func (c *SnowflakeConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName) + var schemaExists sql.NullBool err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, c.rawSchema).Scan(&schemaExists) if err != nil { @@ -625,6 +634,7 @@ func (c *SnowflakeConnector) CreateRawTable(ctx context.Context, req *protos.Cre } func (c *SnowflakeConnector) SyncFlowCleanup(ctx context.Context, jobName string) error { + ctx = c.withMirrorNameQueryTag(ctx, jobName) err := c.PostgresMetadata.SyncFlowCleanup(ctx, jobName) if err != nil { return fmt.Errorf("[snowflake drop mirror] unable to clear metadata for sync flow cleanup: %w", err)