Skip to content

Commit

Permalink
Merge branch 'main' into remove-temporal-testsuite
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Feb 28, 2024
2 parents e69edac + a8b1c6d commit 46208f3
Show file tree
Hide file tree
Showing 15 changed files with 611 additions and 322 deletions.
79 changes: 41 additions & 38 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -233,7 +234,10 @@ func (a *FlowableActivity) MaintainPull(
case <-ticker.C:
activity.RecordHeartbeat(ctx, "keep session alive")
if err := srcConn.ReplPing(ctx); err != nil {
activity.GetLogger(ctx).Error("Failed to send keep alive ping to replication connection", slog.Any("error", err))
a.CdcCacheRw.Lock()
delete(a.CdcCache, sessionID)
a.CdcCacheRw.Unlock()
return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err)
}
case <-ctx.Done():
a.CdcCacheRw.Lock()
Expand Down Expand Up @@ -293,7 +297,7 @@ func (a *FlowableActivity) SyncFlow(
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, err
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
Expand Down Expand Up @@ -613,9 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

pullCtx, pullCancel := context.WithCancel(ctx)
defer pullCancel()
srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand All @@ -635,33 +637,42 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})
defer shutdown()

var stream *model.QRecordStream
var rowsSynced int
bufferSize := shared.FetchAndChannelSize
var wg sync.WaitGroup

var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
stream = model.NewQRecordStream(bufferSize)
wg.Add(1)

go func() {
errGroup, errCtx := errgroup.WithContext(ctx)
stream := model.NewQRecordStream(bufferSize)
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(ctx, config, partition, stream)
tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream)
numRecords := int64(tmp)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
logger.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
return fmt.Errorf("failed to pull records: %w", err)
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
logger.Error(err.Error())
goroutineErr = err
}
}
wg.Done()
}()
return nil
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
return nil
})

err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
} else {
recordBatch, err := srcConn.PullQRepRecords(ctx, config, partition)
if err != nil {
Expand All @@ -675,35 +686,27 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return err
}

stream, err = recordBatch.ToQRecordStream(bufferSize)
stream, err := recordBatch.ToQRecordStream(bufferSize)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to convert to qrecord stream: %w", err)
}
}

rowsSynced, err := dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}

if rowsSynced == 0 {
logger.Info("no records to push for partition " + partition.PartitionId)
pullCancel()
} else {
wg.Wait()
if goroutineErr != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, goroutineErr)
return goroutineErr
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
}

if rowsSynced > 0 {
logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return err
}

logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
} else {
logger.Info("no records to push for partition " + partition.PartitionId)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
41 changes: 29 additions & 12 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) {
"watermark_ts timestamp",
"mytimestamp timestamp",
"mytztimestamp timestamptz",
"medieval timestamptz",
"mybaddate date",
"mydate date",
}
tblFieldStr := strings.Join(tblFields, ",")
_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -32,14 +35,22 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) {
require.NoError(s.t, err)

var rows []string
row := `(CURRENT_TIMESTAMP,'10001-03-14 23:05:52','50001-03-14 23:05:52.216809+00')`
row := `(CURRENT_TIMESTAMP,
'10001-03-14 23:05:52',
'50001-03-14 23:05:52.216809+00',
'1534-03-14 23:05:52.216809+00',
'10000-03-14',
CURRENT_TIMESTAMP)`
rows = append(rows, row)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test_%s.%s (
watermark_ts,
mytimestamp,
mytztimestamp
mytztimestamp,
medieval,
mybaddate,
mydate
) VALUES %s;
`, s.bqSuffix, tableName, strings.Join(rows, ",")))
require.NoError(s.t, err)
Expand Down Expand Up @@ -76,16 +87,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
e2e.RequireEqualTables(s, tblName, "*")
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() {
func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_And_Date_QRep() {
tc := e2e.NewTemporalClient(s.t)

tblName := "test_qrep_flow_avro_bq"
tblName := "test_invalid_time_bq"
s.setupTimeTable(tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}",
s.bqSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro",
qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_invalid_time_bq",
fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName),
tblName,
query,
Expand All @@ -103,13 +113,20 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() {
err = env.Error()
require.NoError(s.t, err)

ok, err := s.bqHelper.CheckNull(tblName, []string{"mytimestamp"})
require.NoError(s.t, err)
require.False(s.t, ok)
goodValues := []string{"watermark_ts", "mydate", "medieval"}
badValues := []string{"mytimestamp", "mytztimestamp", "mybaddate"}

ok, err = s.bqHelper.CheckNull(tblName, []string{"mytztimestamp"})
require.NoError(s.t, err)
require.False(s.t, ok)
for _, col := range goodValues {
ok, err := s.bqHelper.CheckNull(tblName, []string{col})
require.NoError(s.t, err)
require.True(s.t, ok)
}

for _, col := range badValues {
ok, err := s.bqHelper.CheckNull(tblName, []string{col})
require.NoError(s.t, err)
require.False(s.t, ok)
}
}

func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,13 +1118,13 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
}

getWorkflowState := func() peerflow.CDCFlowWorkflowState {
var workflowState peerflow.CDCFlowWorkflowState
var state peerflow.CDCFlowWorkflowState
val, err := env.Query(shared.CDCFlowStateQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&workflowState)
err = val.Get(&state)
e2e.EnvNoError(s.t, env, err)

return workflowState
return state
}

getFlowStatus := func() protos.FlowStatus {
Expand Down
48 changes: 45 additions & 3 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG {
func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) {
err := e2e.CreateTableForQRep(s.Conn(), s.suffix, tableName)
require.NoError(s.t, err)
err = e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount)
require.NoError(s.t, err)

if rowCount > 0 {
err = e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount)
require.NoError(s.t, err)
}
}

func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error {
Expand Down Expand Up @@ -256,7 +259,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() {
func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() {
tc := e2e.NewTemporalClient(s.t)

numRows := 10
Expand Down Expand Up @@ -297,3 +300,42 @@ func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_P
err = s.checkSyncedAt(dstSchemaQualified)
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() {
tc := e2e.NewTemporalClient(s.t)

numRows := 0

srcTable := "test_no_rows_qrep_pg_1"
s.setupSourceTable(srcTable, numRows)

dstTable := "test_no_rows_qrep_pg_2"

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
s.suffix, srcTable)

postgresPeer := e2e.GeneratePostgresPeer()

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
"test_no_rows_qrep_pg",
srcSchemaQualified,
dstSchemaQualified,
query,
postgresPeer,
"",
true,
"_PEERDB_SYNCED_AT",
)
require.NoError(s.t, err)

env := e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())

err = env.GetWorkflowError()
require.NoError(s.t, err)
}
6 changes: 6 additions & 0 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ func (c *QValueAvroConverter) processGoDate() (interface{}, error) {
return nil, errors.New("invalid Time value for Date")
}

// Bigquery will not allow Date if it is less than 1AD and more than 9999AD
// So make such Dates null
if DisallowedTimestamp(c.TargetDWH, t, c.logger) {
return nil, nil
}

// Snowflake has issues with avro timestamp types, returning as string form
// See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file
if c.TargetDWH == QDWHTypeSnowflake {
Expand Down
4 changes: 2 additions & 2 deletions flow/model/qvalue/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
// Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD
func DisallowedTimestamp(dwh QDWHType, t time.Time, logger log.Logger) bool {
if dwh == QDWHTypeBigQuery {
tMicro := t.UnixMicro()
if tMicro < 0 || tMicro > 253402300799999999 { // 9999-12-31 23:59:59.999999
year := t.Year()
if year < 1 || year > 9999 {
logger.Warn("Nulling Timestamp value for BigQuery as it exceeds allowed range",
"timestamp", t.String())
return true
Expand Down
16 changes: 16 additions & 0 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{
Name: "cdc-dynamic-properties",
}

var SyncStopSignal = TypedSignal[struct{}]{
Name: "sync-stop",
}

var SyncErrorSignal = TypedSignal[string]{
Name: "sync-error",
}

var SyncResultSignal = TypedSignal[SyncResponse]{
Name: "sync-result",
}

var SyncOptionsSignal = TypedSignal[*protos.SyncFlowOptions]{
Name: "sync-options",
}

var NormalizeSignal = TypedSignal[NormalizePayload]{
Name: "normalize",
}
Expand Down
Loading

0 comments on commit 46208f3

Please sign in to comment.