Skip to content

Commit

Permalink
tests and lint
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 18, 2023
1 parent 6252da8 commit f8b608d
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 16 deletions.
16 changes: 5 additions & 11 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,15 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro
backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName))
pureColNames = append(pureColNames, colName)
}
// append synced_at column
colsArray := append(backtickColNames,
fmt.Sprintf("`%s`", strings.ToUpper(peerdbCols.SyncedAtColName)),
)
valuesArray := append(backtickColNames, "CURRENT_TIMESTAMP")
insertColumnsSQL := strings.Join(colsArray, ", ")
// fill in synced_at column
insertValuesSQL := strings.Join(valuesArray, ", ")
csep := strings.Join(backtickColNames, ", ")
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.UnchangedToastColumns, peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := strings.Join(append(colsArray,
m.peerdbCols.SoftDeleteColName), ",")
softDeleteInsertValuesSQL := strings.Join(append(valuesArray, "TRUE"), ",")
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE"

updateStatementsforToastCols = append(updateStatementsforToastCols,
fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)",
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,8 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, sync

func (c *PostgresConnector) generateNormalizeStatements(destinationTableIdentifier string,
unchangedToastColumns []string, rawTableIdentifier string, supportsMerge bool,
peerdbCols *protos.PeerDBColumns) []string {
peerdbCols *protos.PeerDBColumns,
) []string {
if supportsMerge {
return []string{c.generateMergeStatement(destinationTableIdentifier, unchangedToastColumns,
rawTableIdentifier, peerdbCols)}
Expand All @@ -547,7 +548,8 @@ func (c *PostgresConnector) generateNormalizeStatements(destinationTableIdentifi
}

func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifier string,
rawTableIdentifier string, peerdbCols *protos.PeerDBColumns) []string {
rawTableIdentifier string, peerdbCols *protos.PeerDBColumns,
) []string {
normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier]
columnNames := make([]string, 0, len(normalizedTableSchema.Columns))
flattenedCastsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns))
Expand Down Expand Up @@ -642,7 +644,6 @@ func (c *PostgresConnector) generateMergeStatement(
flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",")
insertValuesSQLArray := make([]string, 0, len(columnNames))
for _, columnName := range columnNames {
fmt.Println("MERGE columnName: ", columnName)
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", columnName))
}

Expand Down Expand Up @@ -693,7 +694,8 @@ func (c *PostgresConnector) generateMergeStatement(
}

func (c *PostgresConnector) generateUpdateStatement(allCols []string,
unchangedToastColsLists []string, peerdbCols *protos.PeerDBColumns) []string {
unchangedToastColsLists []string, peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastColsLists))

for _, cols := range unchangedToastColsLists {
Expand Down
104 changes: 103 additions & 1 deletion flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -51,6 +52,42 @@ func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string {
return fmt.Sprintf("%s_%s", input, s.bqSuffix)
}

func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error {
query := fmt.Sprintf("SELECT `_PEERDB_IS_DELETED`,`_PEERDB_SYNCED_AT` FROM %s WHERE id = %d",
dstSchemaQualified, rowID)

recordBatch, err := s.bqHelper.ExecuteAndProcessQuery(query)
if err != nil {
return err
}

recordCount := 0
for _, record := range recordBatch.Records {
for _, entry := range record.Entries {
if entry.Kind == qvalue.QValueKindBoolean {
isDeleteVal, ok := entry.Value.(bool)
if !(ok && isDeleteVal) {
return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED is not true")
}
recordCount += 1
}

if entry.Kind == qvalue.QValueKindTimestamp {
_, ok := entry.Value.(time.Time)
if !ok {
return fmt.Errorf("peerdb column failed: _PEERDB_SYNCED_AT is not valid")
}
recordCount += 1
}
}
}
if recordCount != 2 {
return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED or _PEERDB_SYNCED_AT not present")
}

return nil
}

// setupBigQuery sets up the bigquery connection.
func setupBigQuery(t *testing.T) *BigQueryTestHelper {
bqHelper, err := NewBigQueryTestHelper()
Expand Down Expand Up @@ -419,7 +456,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
s.compareTableContentsBQ(dstTableName, "id,t1,t2,k,_PEERDB_IS_DELETED, _PEERDB_SYNCED_AT")
env.AssertExpectations(s.t)
<-done
}
Expand Down Expand Up @@ -1095,3 +1132,68 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {

env.AssertExpectations(s.t)
}

func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_peerdb_cols")
dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 2,
MaxBatchSize: 100,
}

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 1 row into the source table
testKey := fmt.Sprintf("test_key_%d", 1)
testValue := fmt.Sprintf("test_value_%d", 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
require.NoError(s.t, err)

// delete that row
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1
`, srcTableName))
require.NoError(s.t, err)
fmt.Println("Inserted 10 rows into the source table")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
require.Contains(s.t, err.Error(), "continue as new")

err = s.checkPeerdbColumns(dstTableName, 1)
require.NoError(s.t, err)

env.AssertExpectations(s.t)
}
1 change: 1 addition & 0 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto
ret.Source = GeneratePostgresPeer(c.PostgresPort)
ret.Destination = c.Destination
ret.CdcStagingPath = c.CdcStagingPath
ret.SoftDelete = true
ret.SoftDeleteColName = "_PEERDB_IS_DELETED"
ret.SyncedAtColName = "_PEERDB_SYNCED_AT"
return ret, nil
Expand Down
85 changes: 85 additions & 0 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgx/v5/pgtype"
)

func (s *PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string {
Expand All @@ -18,6 +19,27 @@ func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string {
return fmt.Sprintf("%s_%s", input, postgresSuffix)
}

func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error {
query := fmt.Sprintf(`SELECT "_PEERDB_IS_DELETED","_PEERDB_SYNCED_AT" FROM %s WHERE id = %d`,
dstSchemaQualified, rowID)
var isDeleted pgtype.Bool
var syncedAt pgtype.Timestamp
err := s.pool.QueryRow(context.Background(), query).Scan(&isDeleted, &syncedAt)
if err != nil {
return fmt.Errorf("failed to query row: %w", err)
}

if !isDeleted.Bool {
return fmt.Errorf("isDeleted is not true")
}

if !syncedAt.Valid {
return fmt.Errorf("syncedAt is not valid")
}

return nil
}

func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.T())
Expand Down Expand Up @@ -474,3 +496,66 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.T())

srcTableName := s.attachSchemaSuffix("test_peerdb_cols")
dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 2,
MaxBatchSize: 100,
}

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 1 row into the source table
testKey := fmt.Sprintf("test_key_%d", 1)
testValue := fmt.Sprintf("test_value_%d", 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
s.NoError(err)

// delete that row
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1
`, srcTableName))
s.NoError(err)
fmt.Println("Inserted and deleted a row for peerdb column check")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

err = env.GetWorkflowError()
// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")
checkErr := s.checkPeerdbColumns(dstTableName, 1)
s.NoError(checkErr)
env.AssertExpectations(s.T())
}

0 comments on commit f8b608d

Please sign in to comment.