Skip to content

Commit

Permalink
Merge branch 'main' into saisrirampur-patch-4
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Nov 9, 2023
2 parents f15f162 + 319cde6 commit 31fc870
Show file tree
Hide file tree
Showing 129 changed files with 27,402 additions and 15,955 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/customer-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
packages: write
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dev-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
packages: write
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ jobs:
--health-retries 5
steps:
- name: checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- uses: actions/setup-go@v3
with:
go-version: ">=1.19.0"
go-version: ">=1.21.0"

- name: install gotestsum
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stable-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
packages: write
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ui-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ jobs:
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install Node.js dependencies
working-directory: ui
run: yarn install --frozen-lockfile
run: npm ci

- name: Build
working-directory: ui
run: yarn build
run: npm run build
6 changes: 4 additions & 2 deletions .github/workflows/ui-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ jobs:
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install Node.js dependencies
working-directory: ui
run: yarn install --frozen-lockfile
run: npm ci

- name: lint
uses: wearerequired/lint-action@v2
Expand All @@ -34,3 +34,5 @@ jobs:
prettier: true
eslint_dir: ui
prettier_dir: ui
eslint_args: "--max-warnings 0"
eslint_extensions: js,ts,jsx,tsx
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.vscode
.vscode/
.env
tmp/
.envrc
.idea/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ bash ./run-peerdb.sh
# OR for local development, images will be built locally:
bash ./dev-peerdb.sh

# connect to peerdb and query away (Use psql version >=14.0, <16.0)
# connect to peerdb and query away (Use psql version >=14.0)
psql "port=9900 host=localhost password=peerdb"
```

Expand Down
26 changes: 24 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition) error {
if config.SourcePeer.Type != protos.DBType_POSTGRES {
if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
return nil
}
waitBetweenBatches := 5 * time.Second
Expand Down Expand Up @@ -723,7 +723,8 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
return nil
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (
*protos.RenameTablesOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -742,3 +743,24 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena

return sfConn.RenameTables(config)
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

// check if destination is snowflake, if not error out
if req.Peer.Type != protos.DBType_SNOWFLAKE {
return nil, fmt.Errorf("create tables from existing is only supported on snowflake")
}

sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
}

return sfConn.CreateTablesFromExisting(req)
}
65 changes: 50 additions & 15 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

state := peerflow.NewCDCFlowState()
state := peerflow.NewCDCFlowWorkflowState()
_, err = h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
Expand Down Expand Up @@ -208,11 +208,6 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
lastPartition := &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
}

cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand All @@ -225,14 +220,14 @@ func (h *FlowRequestHandler) CreateQRepFlow(
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}
numPartitionsProcessed := 0

state := peerflow.NewQRepFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.QRepFlowWorkflow, // workflow function
cfg, // workflow input
lastPartition, // last partition
numPartitionsProcessed, // number of partitions processed
state,
)
if err != nil {
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
Expand Down Expand Up @@ -340,19 +335,52 @@ func (h *FlowRequestHandler) ShutdownFlow(
}
}

delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
if delErr != nil {
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: err.Error(),
}, err
if req.RemoveFlowEntry {
delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
if delErr != nil {
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: err.Error(),
}, err
}
}

return &protos.ShutdownResponse{
Ok: true,
}, nil
}

func (h *FlowRequestHandler) FlowStateChange(
ctx context.Context,
req *protos.FlowStateChangeRequest,
) (*protos.FlowStateChangeResponse, error) {
var err error
if req.RequestedFlowState == protos.FlowState_STATE_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.NoopSignal,
)
}
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
}

return &protos.FlowStateChangeResponse{
Ok: true,
}, nil
}

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 3 * time.Second
Expand Down Expand Up @@ -503,6 +531,13 @@ func (h *FlowRequestHandler) CreatePeer(
}
sfConfig := sfConfigObject.SnowflakeConfig
encodedConfig, encodingErr = proto.Marshal(sfConfig)
case protos.DBType_BIGQUERY:
bqConfigObject, ok := config.(*protos.Peer_BigqueryConfig)
if !ok {
return wrongConfigResponse, nil
}
bqConfig := bqConfigObject.BigqueryConfig
encodedConfig, encodingErr = proto.Marshal(bqConfig)
case protos.DBType_SQLSERVER:
sqlServerConfigObject, ok := config.(*protos.Peer_SqlserverConfig)
if !ok {
Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
return nil, fmt.Errorf("failed to create BigQuery client: %v", err)
}

datasetID := config.GetDatasetId()
_, checkErr := client.Dataset(datasetID).Metadata(ctx)
if checkErr != nil {
log.Errorf("failed to get dataset metadata: %v", checkErr)
return nil, fmt.Errorf("failed to get dataset metadata: %v", checkErr)
}

storageClient, err := bqsa.CreateStorageClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create Storage client: %v", err)
Expand All @@ -174,7 +181,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
ctx: ctx,
bqConfig: config,
client: client,
datasetID: config.GetDatasetId(),
datasetID: datasetID,
storageClient: storageClient,
}, nil
}
Expand Down
Loading

0 comments on commit 31fc870

Please sign in to comment.