Skip to content

Commit

Permalink
Merge branch 'main' into remove-temporal-testsuite
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 28, 2024
2 parents e69edac + a8b1c6d commit 7197afb
Show file tree
Hide file tree
Showing 19 changed files with 639 additions and 410 deletions.
79 changes: 41 additions & 38 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -233,7 +234,10 @@ func (a *FlowableActivity) MaintainPull(
case <-ticker.C:
activity.RecordHeartbeat(ctx, "keep session alive")
if err := srcConn.ReplPing(ctx); err != nil {
activity.GetLogger(ctx).Error("Failed to send keep alive ping to replication connection", slog.Any("error", err))
a.CdcCacheRw.Lock()
delete(a.CdcCache, sessionID)
a.CdcCacheRw.Unlock()
return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err)
}
case <-ctx.Done():
a.CdcCacheRw.Lock()
Expand Down Expand Up @@ -293,7 +297,7 @@ func (a *FlowableActivity) SyncFlow(
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, err
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
Expand Down Expand Up @@ -613,9 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

pullCtx, pullCancel := context.WithCancel(ctx)
defer pullCancel()
srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand All @@ -635,33 +637,42 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})
defer shutdown()

var stream *model.QRecordStream
var rowsSynced int
bufferSize := shared.FetchAndChannelSize
var wg sync.WaitGroup

var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
stream = model.NewQRecordStream(bufferSize)
wg.Add(1)

go func() {
errGroup, errCtx := errgroup.WithContext(ctx)
stream := model.NewQRecordStream(bufferSize)
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(ctx, config, partition, stream)
tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream)
numRecords := int64(tmp)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
logger.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
return fmt.Errorf("failed to pull records: %w", err)
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
logger.Error(err.Error())
goroutineErr = err
}
}
wg.Done()
}()
return nil
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
return nil
})

err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
} else {
recordBatch, err := srcConn.PullQRepRecords(ctx, config, partition)
if err != nil {
Expand All @@ -675,35 +686,27 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return err
}

stream, err = recordBatch.ToQRecordStream(bufferSize)
stream, err := recordBatch.ToQRecordStream(bufferSize)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to convert to qrecord stream: %w", err)
}
}

rowsSynced, err := dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}

if rowsSynced == 0 {
logger.Info("no records to push for partition " + partition.PartitionId)
pullCancel()
} else {
wg.Wait()
if goroutineErr != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, goroutineErr)
return goroutineErr
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
}

if rowsSynced > 0 {
logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return err
}

logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
} else {
logger.Info("no records to push for partition " + partition.PartitionId)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
8 changes: 3 additions & 5 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
tc := e2e.NewTemporalClient(s.t)

env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, nil, nil)

// Verify workflow completes
require.True(s.t, env.Finished())
err := env.Error()
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
require.Error(s.t, env.Error())

// assert that error contains "invalid connection configs"
require.Contains(s.t, err.Error(), "invalid connection configs")
require.Contains(s.t, env.Error(), "invalid connection configs")
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
Expand Down
64 changes: 35 additions & 29 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -22,6 +23,9 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) {
"watermark_ts timestamp",
"mytimestamp timestamp",
"mytztimestamp timestamptz",
"medieval timestamptz",
"mybaddate date",
"mydate date",
}
tblFieldStr := strings.Join(tblFields, ",")
_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -32,14 +36,22 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) {
require.NoError(s.t, err)

var rows []string
row := `(CURRENT_TIMESTAMP,'10001-03-14 23:05:52','50001-03-14 23:05:52.216809+00')`
row := `(CURRENT_TIMESTAMP,
'10001-03-14 23:05:52',
'50001-03-14 23:05:52.216809+00',
'1534-03-14 23:05:52.216809+00',
'10000-03-14',
CURRENT_TIMESTAMP)`
rows = append(rows, row)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test_%s.%s (
watermark_ts,
mytimestamp,
mytztimestamp
mytztimestamp,
medieval,
mybaddate,
mydate
) VALUES %s;
`, s.bqSuffix, tableName, strings.Join(rows, ",")))
require.NoError(s.t, err)
Expand All @@ -66,26 +78,21 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
"")
require.NoError(s.t, err)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)

// Verify workflow completes without error
require.True(s.t, env.Finished())

err = env.Error()
require.NoError(s.t, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
require.NoError(s.t, env.Error())

e2e.RequireEqualTables(s, tblName, "*")
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() {
func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_And_Date_QRep() {
tc := e2e.NewTemporalClient(s.t)

tblName := "test_qrep_flow_avro_bq"
tblName := "test_invalid_time_bq"
s.setupTimeTable(tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}",
s.bqSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro",
qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_invalid_time_bq",
fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName),
tblName,
query,
Expand All @@ -96,20 +103,23 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() {
qrepConfig.WatermarkColumn = "watermark_ts"
require.NoError(s.t, err)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
require.NoError(s.t, env.Error())

// Verify workflow completes without error
require.True(s.t, env.Finished())

err = env.Error()
require.NoError(s.t, err)
goodValues := []string{"watermark_ts", "mydate", "medieval"}
badValues := []string{"mytimestamp", "mytztimestamp", "mybaddate"}

ok, err := s.bqHelper.CheckNull(tblName, []string{"mytimestamp"})
require.NoError(s.t, err)
require.False(s.t, ok)
for _, col := range goodValues {
ok, err := s.bqHelper.CheckNull(tblName, []string{col})
require.NoError(s.t, err)
require.True(s.t, ok)
}

ok, err = s.bqHelper.CheckNull(tblName, []string{"mytztimestamp"})
require.NoError(s.t, err)
require.False(s.t, ok)
for _, col := range badValues {
ok, err := s.bqHelper.CheckNull(tblName, []string{col})
require.NoError(s.t, err)
require.False(s.t, ok)
}
}

func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
Expand All @@ -133,12 +143,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
"_PEERDB_SYNCED_AT")
require.NoError(s.t, err)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)

// Verify workflow completes without error
require.True(s.t, env.Finished())

err = env.Error()
require.NoError(s.t, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
require.NoError(s.t, env.Error())

err = s.checkPeerdbColumns(tblName, false)
require.NoError(s.t, err)
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,13 +1118,13 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
}

getWorkflowState := func() peerflow.CDCFlowWorkflowState {
var workflowState peerflow.CDCFlowWorkflowState
var state peerflow.CDCFlowWorkflowState
val, err := env.Query(shared.CDCFlowStateQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&workflowState)
err = val.Get(&state)
e2e.EnvNoError(s.t, env, err)

return workflowState
return state
}

getFlowStatus := func() protos.FlowStatus {
Expand Down
58 changes: 44 additions & 14 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG {
func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) {
err := e2e.CreateTableForQRep(s.Conn(), s.suffix, tableName)
require.NoError(s.t, err)
err = e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount)
require.NoError(s.t, err)

if rowCount > 0 {
err = e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount)
require.NoError(s.t, err)
}
}

func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error {
Expand Down Expand Up @@ -245,18 +248,14 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
require.NoError(s.t, err)

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)

// Verify workflow completes without error
require.True(s.t, env.Finished())

err = env.Error()
require.NoError(s.t, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
require.NoError(s.t, env.Error())

err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*")
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() {
func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() {
tc := e2e.NewTemporalClient(s.t)

numRows := 10
Expand Down Expand Up @@ -287,13 +286,44 @@ func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_P
require.NoError(s.t, err)

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
require.NoError(s.t, env.Error())

// Verify workflow completes without error
require.True(s.t, env.Finished())

err = env.Error()
err = s.checkSyncedAt(dstSchemaQualified)
require.NoError(s.t, err)
}

err = s.checkSyncedAt(dstSchemaQualified)
func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() {
tc := e2e.NewTemporalClient(s.t)

numRows := 0

srcTable := "test_no_rows_qrep_pg_1"
s.setupSourceTable(srcTable, numRows)

dstTable := "test_no_rows_qrep_pg_2"

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
s.suffix, srcTable)

postgresPeer := e2e.GeneratePostgresPeer()

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
"test_no_rows_qrep_pg",
srcSchemaQualified,
dstSchemaQualified,
query,
postgresPeer,
"",
true,
"_PEERDB_SYNCED_AT",
)
require.NoError(s.t, err)

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
require.NoError(s.t, env.Error())
}
Loading

0 comments on commit 7197afb

Please sign in to comment.