Skip to content

Commit

Permalink
Merge branch 'main' into insert-soft-delete-same-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 2, 2023
2 parents 6880dfa + ad769d4 commit 4287fdb
Show file tree
Hide file tree
Showing 22 changed files with 601 additions and 542 deletions.
8 changes: 4 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,12 @@ func (h *FlowRequestHandler) ValidatePeer(
}, nil
}

status := conn.ConnectionActive()
if !status {
connErr := conn.ConnectionActive()
if connErr != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish active connection to %s peer %s.",
req.Peer.Type, req.Peer.Name),
Message: fmt.Sprintf("failed to establish active connection to %s peer %s: %v",
req.Peer.Type, req.Peer.Name, connErr),
}, nil
}

Expand Down
12 changes: 10 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,16 @@ func (c *BigQueryConnector) Close() error {
}

// ConnectionActive returns true if the connection is active.
func (c *BigQueryConnector) ConnectionActive() bool {
return c.client != nil
func (c *BigQueryConnector) ConnectionActive() error {
_, err := c.client.Dataset(c.datasetID).Metadata(c.ctx)
if err != nil {
return fmt.Errorf("failed to get dataset metadata: %v", err)
}

if c.client == nil {
return fmt.Errorf("BigQuery client is nil")
}
return nil
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var ErrUnsupportedFunctionality = errors.New("requested connector does not suppo

type Connector interface {
Close() error
ConnectionActive() bool
ConnectionActive() error
}

type CDCPullConnector interface {
Expand Down Expand Up @@ -223,7 +223,6 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)

case protos.DBType_SQLSERVER:
sqlServerConfig := peer.GetSqlserverConfig()
if sqlServerConfig == nil {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (c *EventHubConnector) Close() error {
return allErrors
}

func (c *EventHubConnector) ConnectionActive() bool {
return true
func (c *EventHubConnector) ConnectionActive() error {
return nil
}

func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ func (c *PostgresConnector) Close() error {
}

// ConnectionActive returns true if the connection is active.
func (c *PostgresConnector) ConnectionActive() bool {
func (c *PostgresConnector) ConnectionActive() error {
if c.pool == nil {
return false
return fmt.Errorf("connection pool is nil")
}
return c.pool.Ping(c.ctx) == nil
pingErr := c.pool.Ping(c.ctx)
return pingErr
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() {
err = teardownTx.Commit(context.Background())
require.NoError(suite.T(), err)

suite.True(suite.connector.ConnectionActive())
suite.True(suite.connector.ConnectionActive() == nil)

err = suite.connector.Close()
require.NoError(suite.T(), err)

suite.False(suite.connector.ConnectionActive())
suite.False(suite.connector.ConnectionActive() == nil)
}

func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() {
err = teardownTx.Commit(context.Background())
suite.failTestError(err)

suite.True(suite.connector.ConnectionActive())
suite.True(suite.connector.ConnectionActive() == nil)
err = suite.connector.Close()
suite.failTestError(err)
suite.False(suite.connector.ConnectionActive())
suite.False(suite.connector.ConnectionActive() == nil)
}

func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
Expand Down
22 changes: 13 additions & 9 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ func NewS3Connector(ctx context.Context,
return nil, err
}

validErr := ValidCheck(s3Client, config.Url, pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
return nil, validErr
}

return &S3Connector{
ctx: ctx,
url: config.Url,
Expand Down Expand Up @@ -140,9 +134,19 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos
return nil
}

func (c *S3Connector) ConnectionActive() bool {
_, err := c.client.ListBuckets(nil)
return err == nil
func (c *S3Connector) ConnectionActive() error {
_, listErr := c.client.ListBuckets(nil)
if listErr != nil {
return listErr
}

validErr := ValidCheck(&c.client, c.url, c.pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
return validErr
}

return nil
}

func (c *S3Connector) NeedsSetupMetadataTables() bool {
Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,14 @@ func (c *SnowflakeConnector) Close() error {
return nil
}

func (c *SnowflakeConnector) ConnectionActive() bool {
func (c *SnowflakeConnector) ConnectionActive() error {
if c == nil || c.database == nil {
return false
return fmt.Errorf("SnowflakeConnector is nil")
}
return c.database.PingContext(c.ctx) == nil

// This also checks if database exists
err := c.database.PingContext(c.ctx)
return err
}

func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type SQLQueryExecutor interface {
ConnectionActive() bool
ConnectionActive() error
Close() error

CreateSchema(schemaName string) error
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func (c *SQLServerConnector) Close() error {
}

// ConnectionActive checks if the connection is still active
func (c *SQLServerConnector) ConnectionActive() bool {
func (c *SQLServerConnector) ConnectionActive() error {
if err := c.db.Ping(); err != nil {
return false
return err
}
return true
return nil
}
6 changes: 3 additions & 3 deletions flow/e2e/sqlserver/sqlserver_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func NewSQLServerHelper(name string) (*SQLServerHelper, error) {
return nil, err
}

testConn := connector.ConnectionActive()
if !testConn {
return nil, fmt.Errorf("invalid connection configs")
connErr := connector.ConnectionActive()
if connErr != nil {
return nil, fmt.Errorf("invalid connection configs: %v", connErr)
}

rndNum, err := util.RandomUInt64()
Expand Down
10 changes: 5 additions & 5 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ func QRepFlowWorkflow(

q := NewQRepFlowExecution(ctx, config, runUUID)

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
return fmt.Errorf("failed to setup watermark table: %w", err)
}

err = q.SetupMetadataTables(ctx)
if err != nil {
return fmt.Errorf("failed to setup metadata tables: %w", err)
Expand All @@ -418,11 +423,6 @@ func QRepFlowWorkflow(
return err
}

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
return fmt.Errorf("failed to setup watermark table: %w", err)
}

logger.Info("fetching partitions to replicate for peer flow - ", config.FlowJobName)
partitions, err := q.GetPartitions(ctx, state.LastPartition)
if err != nil {
Expand Down
Loading

0 comments on commit 4287fdb

Please sign in to comment.