From 8b63e8ffd8992ae8fb9d8be0ff3af475501b74e6 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 28 Dec 2023 12:58:50 -0500 Subject: [PATCH] Check if a schema already exists and not create it --- flow/connectors/snowflake/snowflake.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 164d4b6527..57f7f956d7 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -147,6 +147,7 @@ func NewSnowflakeConnector(ctx context.Context, if snowflakeProtoConfig.MetadataSchema != nil { metadataSchema = *snowflakeProtoConfig.MetadataSchema } + flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &SnowflakeConnector{ ctx: ctx, @@ -1010,7 +1011,20 @@ func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normali } func (c *SnowflakeConnector) createPeerDBInternalSchema(createSchemaTx *sql.Tx) error { - _, err := createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) + // check if the internal schema exists + row := createSchemaTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) + var schemaExists pgtype.Bool + err := row.Scan(&schemaExists) + if err != nil { + return fmt.Errorf("error while reading result row: %w", err) + } + + if schemaExists.Bool { + c.logger.Info(fmt.Sprintf("internal schema %s already exists", c.metadataSchema)) + return nil + } + + _, err = createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) if err != nil { return fmt.Errorf("error while creating internal schema for PeerDB: %w", err) }