diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 164d4b652..57f7f956d 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -147,6 +147,7 @@ func NewSnowflakeConnector(ctx context.Context, if snowflakeProtoConfig.MetadataSchema != nil { metadataSchema = *snowflakeProtoConfig.MetadataSchema } + flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &SnowflakeConnector{ ctx: ctx, @@ -1010,7 +1011,20 @@ func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normali } func (c *SnowflakeConnector) createPeerDBInternalSchema(createSchemaTx *sql.Tx) error { - _, err := createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) + // check if the internal schema exists + row := createSchemaTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) + var schemaExists pgtype.Bool + err := row.Scan(&schemaExists) + if err != nil { + return fmt.Errorf("error while reading result row: %w", err) + } + + if schemaExists.Bool { + c.logger.Info(fmt.Sprintf("internal schema %s already exists", c.metadataSchema)) + return nil + } + + _, err = createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) if err != nil { return fmt.Errorf("error while creating internal schema for PeerDB: %w", err) }