Skip to content

Commit

Permalink
flow tests ain't flowing debug pt.3 + unfix
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 3, 2024
1 parent 661080d commit cf547a3
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
flow_test:
strategy:
matrix:
runner: [ubicloud-standard-16-ubuntu-2204-arm, ubuntu-latest]
runner: [ubicloud-standard-16-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
services:
Expand Down
22 changes: 7 additions & 15 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ type PostgresCDCSource struct {
// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
flowJobName string

signaledAsNonEmpty bool
}

type PostgresCDCConfig struct {
Expand Down Expand Up @@ -89,7 +87,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
signaledAsNonEmpty: false,
}, nil
}

Expand Down Expand Up @@ -236,21 +233,16 @@ func (p *PostgresCDCSource) consumeStream(
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error {
if key != nil {
err := cdcRecordsStorage.Set(*key, rec)
if err != nil {
return err
}
err := cdcRecordsStorage.Set(key, rec)
if err != nil {
return err
}
records.AddRecord(rec)

// we are not storing in case of replident full
// so don't tie signalling to length of storage
if !p.signaledAsNonEmpty {
if cdcRecordsStorage.Len() == 1 {
records.SignalAsNotEmpty()
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
p.logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline))
p.signaledAsNonEmpty = true
}
return nil
}
Expand Down Expand Up @@ -556,7 +548,7 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// log lsn and relation id for debugging
p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Info(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down Expand Up @@ -591,7 +583,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// log lsn and relation id for debugging
p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Info(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down Expand Up @@ -634,7 +626,7 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// log lsn and relation id for debugging
p.logger.Debug(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Info(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down
58 changes: 30 additions & 28 deletions flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,39 @@ func (c *cdcRecordsStore) initPebbleDB() error {
return nil
}

func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error {
_, ok := c.inMemoryRecords[key]
if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold {
c.inMemoryRecords[key] = rec
} else {
if c.pebbleDB == nil {
slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk",
c.numRecordsSwitchThreshold),
slog.String(string(shared.FlowNameKey), c.flowJobName))
err := c.initPebbleDB()
func (c *cdcRecordsStore) Set(key *model.TableWithPkey, rec model.Record) error {
if key != nil {
_, ok := c.inMemoryRecords[*key]
if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold {
c.inMemoryRecords[*key] = rec
} else {
if c.pebbleDB == nil {
slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk",
c.numRecordsSwitchThreshold),
slog.String(string(shared.FlowNameKey), c.flowJobName))
err := c.initPebbleDB()
if err != nil {
return err
}
}

encodedKey, err := encVal(key)
if err != nil {
return err
}
}

encodedKey, err := encVal(key)
if err != nil {
return err
}
// necessary to point pointer to interface so the interface is exposed
// instead of the underlying type
encodedRec, err := encVal(&rec)
if err != nil {
return err
}
// we're using Pebble as a cache, no need for durability here.
err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{
Sync: false,
})
if err != nil {
return fmt.Errorf("unable to store value in Pebble: %w", err)
// necessary to point pointer to interface so the interface is exposed
// instead of the underlying type
encodedRec, err := encVal(&rec)
if err != nil {
return err
}
// we're using Pebble as a cache, no need for durability here.
err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{
Sync: false,
})
if err != nil {
return fmt.Errorf("unable to store value in Pebble: %w", err)
}
}
}
c.numRecords++
Expand Down
28 changes: 24 additions & 4 deletions flow/connectors/utils/cdc_records/cdc_records_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestSingleRecord(t *testing.T) {
cdcRecordsStore.numRecordsSwitchThreshold = 10

key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
err := cdcRecordsStore.Set(&key, rec)
require.NoError(t, err)
// should not spill into DB
require.Len(t, cdcRecordsStore.inMemoryRecords, 1)
Expand All @@ -103,15 +103,15 @@ func TestRecordsTillSpill(t *testing.T) {
// add records upto set limit
for i := 1; i <= 10; i++ {
key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
err := cdcRecordsStore.Set(&key, rec)
require.NoError(t, err)
require.Len(t, cdcRecordsStore.inMemoryRecords, i)
require.Nil(t, cdcRecordsStore.pebbleDB)
}

// this record should be spilled to DB
key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
err := cdcRecordsStore.Set(&key, rec)
require.NoError(t, err)
_, ok := cdcRecordsStore.inMemoryRecords[key]
require.False(t, ok)
Expand All @@ -132,7 +132,7 @@ func TestTimeAndRatEncoding(t *testing.T) {
cdcRecordsStore.numRecordsSwitchThreshold = 0

key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
err := cdcRecordsStore.Set(&key, rec)
require.NoError(t, err)

retreived, ok, err := cdcRecordsStore.Get(key)
Expand All @@ -145,3 +145,23 @@ func TestTimeAndRatEncoding(t *testing.T) {

require.NoError(t, cdcRecordsStore.Close())
}

func TestNullKeyDoesntStore(t *testing.T) {
t.Parallel()

cdcRecordsStore := NewCDCRecordsStore("test_time_encoding")
cdcRecordsStore.numRecordsSwitchThreshold = 0

key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(nil, rec)
require.NoError(t, err)

retreived, ok, err := cdcRecordsStore.Get(key)
require.Nil(t, retreived)
require.NoError(t, err)
require.False(t, ok)

require.Equal(t, 1, cdcRecordsStore.Len())

require.NoError(t, cdcRecordsStore.Close())
}

0 comments on commit cf547a3

Please sign in to comment.