Skip to content

Commit

Permalink
Check if a schema already exists and not create it
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 28, 2023
1 parent ec9486e commit 8b63e8f
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func NewSnowflakeConnector(ctx context.Context,
if snowflakeProtoConfig.MetadataSchema != nil {
metadataSchema = *snowflakeProtoConfig.MetadataSchema
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &SnowflakeConnector{
ctx: ctx,
Expand Down Expand Up @@ -1010,7 +1011,20 @@ func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normali
}

func (c *SnowflakeConnector) createPeerDBInternalSchema(createSchemaTx *sql.Tx) error {
_, err := createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema))
// check if the internal schema exists
row := createSchemaTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema)
var schemaExists pgtype.Bool
err := row.Scan(&schemaExists)
if err != nil {
return fmt.Errorf("error while reading result row: %w", err)
}

if schemaExists.Bool {
c.logger.Info(fmt.Sprintf("internal schema %s already exists", c.metadataSchema))
return nil
}

_, err = createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema))
if err != nil {
return fmt.Errorf("error while creating internal schema for PeerDB: %w", err)
}
Expand Down

0 comments on commit 8b63e8f

Please sign in to comment.