From cf547a309e00279b84f078d5d5980706f5f35360 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sat, 3 Feb 2024 17:12:28 +0530 Subject: [PATCH] flow tests ain't flowing debug pt.3 + unfix --- .github/workflows/flow.yml | 2 +- flow/connectors/postgres/cdc.go | 22 +++---- .../utils/cdc_records/cdc_records_storage.go | 58 ++++++++++--------- .../cdc_records/cdc_records_storage_test.go | 28 +++++++-- 4 files changed, 62 insertions(+), 48 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 306b4932ee..99b9e99c90 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -10,7 +10,7 @@ jobs: flow_test: strategy: matrix: - runner: [ubicloud-standard-16-ubuntu-2204-arm, ubuntu-latest] + runner: [ubicloud-standard-16-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} timeout-minutes: 30 services: diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 5e8d3b8d81..36d04d1ef7 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -44,8 +44,6 @@ type PostgresCDCSource struct { // for storing chema delta audit logs to catalog catalogPool *pgxpool.Pool flowJobName string - - signaledAsNonEmpty bool } type PostgresCDCConfig struct { @@ -89,7 +87,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, - signaledAsNonEmpty: false, }, nil } @@ -236,21 +233,16 @@ func (p *PostgresCDCSource) consumeStream( nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error { - if key != nil { - err := cdcRecordsStorage.Set(*key, rec) - if err != nil { - return err - } + err := cdcRecordsStorage.Set(key, rec) + if err != nil { + return err } records.AddRecord(rec) - // we are not storing in case of replident full - // so don't tie signalling to length of storage - if !p.signaledAsNonEmpty { + if cdcRecordsStorage.Len() == 1 { records.SignalAsNotEmpty() nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) p.logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline)) - p.signaledAsNonEmpty = true } return nil } @@ -556,7 +548,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -591,7 +583,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -634,7 +626,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index a1bde8d614..c44892c2c0 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -72,37 +72,39 @@ func (c *cdcRecordsStore) initPebbleDB() error { return nil } -func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error { - _, ok := c.inMemoryRecords[key] - if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { - c.inMemoryRecords[key] = rec - } else { - if c.pebbleDB == nil { - slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk", - c.numRecordsSwitchThreshold), - slog.String(string(shared.FlowNameKey), c.flowJobName)) - err := c.initPebbleDB() +func (c *cdcRecordsStore) Set(key *model.TableWithPkey, rec model.Record) error { + if key != nil { + _, ok := c.inMemoryRecords[*key] + if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { + c.inMemoryRecords[*key] = rec + } else { + if c.pebbleDB == nil { + slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk", + c.numRecordsSwitchThreshold), + slog.String(string(shared.FlowNameKey), c.flowJobName)) + err := c.initPebbleDB() + if err != nil { + return err + } + } + + encodedKey, err := encVal(key) if err != nil { return err } - } - - encodedKey, err := encVal(key) - if err != nil { - return err - } - // necessary to point pointer to interface so the interface is exposed - // instead of the underlying type - encodedRec, err := encVal(&rec) - if err != nil { - return err - } - // we're using Pebble as a cache, no need for durability here. - err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{ - Sync: false, - }) - if err != nil { - return fmt.Errorf("unable to store value in Pebble: %w", err) + // necessary to point pointer to interface so the interface is exposed + // instead of the underlying type + encodedRec, err := encVal(&rec) + if err != nil { + return err + } + // we're using Pebble as a cache, no need for durability here. + err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{ + Sync: false, + }) + if err != nil { + return fmt.Errorf("unable to store value in Pebble: %w", err) + } } } c.numRecords++ diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 9f0b7a9f6e..c585541054 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -81,7 +81,7 @@ func TestSingleRecord(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 10 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) // should not spill into DB require.Len(t, cdcRecordsStore.inMemoryRecords, 1) @@ -103,7 +103,7 @@ func TestRecordsTillSpill(t *testing.T) { // add records upto set limit for i := 1; i <= 10; i++ { key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) require.Len(t, cdcRecordsStore.inMemoryRecords, i) require.Nil(t, cdcRecordsStore.pebbleDB) @@ -111,7 +111,7 @@ func TestRecordsTillSpill(t *testing.T) { // this record should be spilled to DB key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] require.False(t, ok) @@ -132,7 +132,7 @@ func TestTimeAndRatEncoding(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) @@ -145,3 +145,23 @@ func TestTimeAndRatEncoding(t *testing.T) { require.NoError(t, cdcRecordsStore.Close()) } + +func TestNullKeyDoesntStore(t *testing.T) { + t.Parallel() + + cdcRecordsStore := NewCDCRecordsStore("test_time_encoding") + cdcRecordsStore.numRecordsSwitchThreshold = 0 + + key, rec := genKeyAndRec(t) + err := cdcRecordsStore.Set(nil, rec) + require.NoError(t, err) + + retreived, ok, err := cdcRecordsStore.Get(key) + require.Nil(t, retreived) + require.NoError(t, err) + require.False(t, ok) + + require.Equal(t, 1, cdcRecordsStore.Len()) + + require.NoError(t, cdcRecordsStore.Close()) +}