Skip to content

Commit

Permalink
Merge branch 'main' into pgwire-17
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 8, 2023
2 parents a9e5be0 + c269ce9 commit 1630da6
Show file tree
Hide file tree
Showing 29 changed files with 582 additions and 531 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
build:
strategy:
matrix:
runner: [ubicloud-standard-8]
runner: [ubicloud-standard-8-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
services:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/customer-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
docker-build:
strategy:
matrix:
runner: [ubicloud]
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
permissions:
contents: read
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dev-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
docker-build:
strategy:
matrix:
runner: [ubicloud]
runner: [ubuntu-latest]
runs-on: ${{ matrix.runner }}
permissions:
contents: read
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ jobs:
flow_test:
strategy:
matrix:
runner: [ubicloud-standard-8, ubuntu-latest]
runner: [ubicloud-standard-8-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
timeout-minutes: 40
services:
pg_cdc:
image: postgis/postgis:15-3.4-alpine
image: imresamu/postgis:15-3.4-alpine
ports:
- 7132:5432
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
pull-requests: write
strategy:
matrix:
runner: [ubicloud]
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
pull-requests: write
strategy:
matrix:
runner: [ubicloud-standard-4]
runner: [ubicloud-standard-4-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stable-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
docker-build:
strategy:
matrix:
runner: [ubicloud]
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
permissions:
contents: read
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ui-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
name: Build & Test UI
strategy:
matrix:
runner: [ubicloud]
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ui-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
name: Run UI linters
strategy:
matrix:
runner: [ubicloud]
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ tmp/
private/
nexus/server/tests/assets/*.json
nexus/server/tests/results/actual/

go.work
go.work.sum
47 changes: 26 additions & 21 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,50 +160,55 @@ func (a *FlowableActivity) CreateNormalizedTable(
return conn.SetupNormalizedTables(config)
}

func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
log.Warnf("warning: failed to get slot info: %v", err)
return err
}

if len(slotInfo) != 0 {
return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
}
return nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
done <-chan struct{},
peerName string,
) {

timeout := 10 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
log.Warnf("warning: failed to get slot info: %v", err)
}

if len(slotInfo) == 0 {
continue
}

select {
case <-ticker.C:
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
case <-done:
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}
case <-ctx.Done():
return
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}

}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
done := make(chan struct{})
defer close(done)
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand Down Expand Up @@ -246,7 +251,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

go a.recordSlotSizePeriodically(ctx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name)
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
Expand Down Expand Up @@ -371,7 +377,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
done <- struct{}{}

return res, nil
}
Expand Down
Loading

0 comments on commit 1630da6

Please sign in to comment.