From 439b176c63af7b8f5ec93c40d8066287b3dce862 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Tue, 12 Dec 2023 22:56:12 +0530 Subject: [PATCH] adding IF NOT EXISTS for pg and bq --- flow/connectors/bigquery/bigquery.go | 3 ++- flow/connectors/postgres/postgres.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index f1ad468c5b..aa2e7750b5 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -237,7 +237,8 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, } for _, addedColumn := range schemaDelta.AddedColumns { - _, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID, + _, err := c.client.Query(fmt.Sprintf( + "ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS `%s` %s", c.datasetID, schemaDelta.DstTableName, addedColumn.ColumnName, qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx) if err != nil { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 2bf7fb7d04..2858841a0d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -712,7 +712,8 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string, } for _, addedColumn := range schemaDelta.AddedColumns { - _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s", + _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf( + "ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s", schemaDelta.DstTableName, addedColumn.ColumnName, qValueKindToPostgresType(addedColumn.ColumnType))) if err != nil {