Skip to content

Commit

Permalink
Merge branch 'main' into fix-writemode
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Oct 5, 2023
2 parents 4c13df4 + 5b9f98d commit a7eea0f
Show file tree
Hide file tree
Showing 30 changed files with 1,033 additions and 283 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ on:
push:
branches: [main, release/*]
pull_request:
branches: [main]
branches:
- "main"
- "release/*"

jobs:
build:
Expand All @@ -31,7 +33,6 @@ jobs:
- uses: actions/checkout@v3
with:
submodules: recursive
token: ${{ secrets.SUBMODULE_CHECKOUT }}

- uses: dtolnay/rust-toolchain@stable

Expand Down
48 changes: 0 additions & 48 deletions .github/workflows/dev-debian.yml

This file was deleted.

5 changes: 1 addition & 4 deletions .github/workflows/dev-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ name: Dev Docker Images
on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
docker-build:
strategy:
matrix:
runner: [ubuntu-latest, ubicloud]
runner: [ubicloud]
runs-on: ${{ matrix.runner }}
permissions:
contents: read
Expand All @@ -20,7 +18,6 @@ jobs:
uses: actions/checkout@v3
with:
submodules: recursive
token: ${{ secrets.SUBMODULE_CHECKOUT }}

- uses: depot/setup-action@v1

Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name: Flow build and test

on:
pull_request:
branches: [main]
branches:
- "main"
push:
branches: [main]

Expand Down Expand Up @@ -34,7 +35,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '>=1.19.0'
go-version: ">=1.19.0"

- name: install gotestsum
run: |
Expand Down Expand Up @@ -73,7 +74,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 4 ./... -timeout 1200s
gotestsum --format testname -- -p 4 ./... -timeout 2400s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
8 changes: 5 additions & 3 deletions .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: GolangCI-Lint

on: [pull_request]
on:
pull_request:
branches:
- "main"

jobs:
golangci-lint:
Expand All @@ -10,14 +13,13 @@ jobs:
pull-requests: write
strategy:
matrix:
runner: [ubuntu-latest, ubicloud]
runner: [ubicloud]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
with:
submodules: recursive
token: ${{ secrets.SUBMODULE_CHECKOUT }}

- name: golangci-lint
uses: reviewdog/action-golangci-lint@v2
Expand Down
34 changes: 17 additions & 17 deletions .github/workflows/rust-lint.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
name: clippy-action
on: [pull_request]

on:
pull_request:
branches:
- "main"

jobs:
clippy:
permissions:
Expand All @@ -8,23 +13,18 @@ jobs:
pull-requests: write
strategy:
matrix:
runner: [ubuntu-latest, ubicloud]
runner: [ubicloud-standard-4]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
with:
submodules: recursive
token: ${{ secrets.SUBMODULE_CHECKOUT }}
- name: checkout
uses: actions/checkout@v3
with:
submodules: recursive

- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy

- uses: giraffate/clippy-action@v1
with:
reporter: 'github-pr-review'
github_token: ${{ secrets.GITHUB_TOKEN }}
workdir: ./nexus
env:
REVIEWDOG_TOKEN: ${{ secrets.REVIEWDOG_TOKEN }}
- name: clippy
run: cargo clippy -- -D warnings
working-directory: ./nexus
62 changes: 0 additions & 62 deletions .github/workflows/stable-debian.yml

This file was deleted.

3 changes: 1 addition & 2 deletions .github/workflows/stable-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
docker-build:
strategy:
matrix:
runner: [ubuntu-latest, ubicloud]
runner: [ubicloud]
runs-on: ${{ matrix.runner }}
permissions:
contents: read
Expand All @@ -19,7 +19,6 @@ jobs:
uses: actions/checkout@v3
with:
submodules: recursive
token: ${{ secrets.SUBMODULE_CHECKOUT }}

- uses: depot/setup-action@v1

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ui-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
name: Build & Test UI
strategy:
matrix:
runner: [ubuntu-latest, ubicloud]
runner: [ubicloud]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand All @@ -25,4 +25,4 @@ jobs:

- name: Build
working-directory: ui
run: yarn build
run: yarn build
4 changes: 2 additions & 2 deletions .github/workflows/ui-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
name: Run UI linters
strategy:
matrix:
runner: [ubuntu-latest, ubicloud]
runner: [ubicloud]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand All @@ -33,4 +33,4 @@ jobs:
eslint: true
prettier: true
eslint_dir: ui
prettier_dir: ui
prettier_dir: ui
7 changes: 4 additions & 3 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDelta(flowJobName string,
}

for _, droppedColumn := range schemaDelta.DroppedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s DROP COLUMN %s", c.datasetID,
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s DROP COLUMN `%s`", c.datasetID,
schemaDelta.DstTableName, droppedColumn)).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn,
Expand All @@ -226,8 +226,9 @@ func (c *BigQueryConnector) ReplayTableSchemaDelta(flowJobName string,
}).Infof("[schema delta replay] dropped column %s", droppedColumn)
}
for _, addedColumn := range schemaDelta.AddedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN %s %s", c.datasetID,
schemaDelta.DstTableName, addedColumn.ColumnName, addedColumn.ColumnType)).Read(c.ctx)
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID,
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.SrcTableName, err)
Expand Down
24 changes: 18 additions & 6 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,17 @@ func (p *PostgresCDCSource) processRelationMessage(
// retrieve initial RelationMessage for table changed.
prevRel := p.relationMessageMapping[currRel.RelationId]
// creating maps for lookup later
prevRelMap := make(map[string]bool)
currRelMap := make(map[string]bool)
prevRelMap := make(map[string]*protos.PostgresTableIdentifier)
currRelMap := make(map[string]*protos.PostgresTableIdentifier)
for _, column := range prevRel.Columns {
prevRelMap[column.Name] = true
prevRelMap[column.Name] = &protos.PostgresTableIdentifier{
RelId: column.DataType,
}
}
for _, column := range currRel.Columns {
currRelMap[column.Name] = true
currRelMap[column.Name] = &protos.PostgresTableIdentifier{
RelId: column.DataType,
}
}

schemaDelta := &protos.TableSchemaDelta{
Expand All @@ -564,7 +568,15 @@ func (p *PostgresCDCSource) processRelationMessage(
}
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if !prevRelMap[column.Name] {
if prevRelMap[column.Name] == nil {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(postgresOIDToQValueKind(column.DataType)),
})
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId {
schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name)
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(postgresOIDToQValueKind(column.DataType)),
Expand All @@ -573,7 +585,7 @@ func (p *PostgresCDCSource) processRelationMessage(
}
for _, column := range prevRel.Columns {
// present in previous relation message, but not in current one, so dropped.
if !currRelMap[column.Name] {
if currRelMap[column.Name] == nil {
schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name)
}
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,10 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
pgType := qValueKindToPostgresType(genericColumnType)
if strings.Contains(genericColumnType, "array") {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS %s",
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
} else {
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("(_peerdb_data->>'%s')::%s AS %s",
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("(_peerdb_data->>'%s')::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
}
if normalizedTableSchema.PrimaryKeyColumn == columnName {
Expand Down
Loading

0 comments on commit a7eea0f

Please sign in to comment.