diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index a001b1434a..f9d9654bca 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -10,49 +10,49 @@ import ( "github.com/stretchr/testify/require" "github.com/PeerDB-io/peer-flow/connectors" - "github.com/PeerDB-io/peer-flow/connectors/clickhouse" + connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/e2e/s3" + e2e_s3 "github.com/PeerDB-io/peer-flow/e2e/s3" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" ) -type ClickHouseSuite struct { +type PeerFlowE2ETestSuiteCH struct { t *testing.T conn *connpostgres.PostgresConnector s3Helper *e2e_s3.S3TestHelper suffix string } -func (s ClickHouseSuite) T() *testing.T { +func (s PeerFlowE2ETestSuiteCH) T() *testing.T { return s.t } -func (s ClickHouseSuite) Connector() *connpostgres.PostgresConnector { +func (s PeerFlowE2ETestSuiteCH) Connector() *connpostgres.PostgresConnector { return s.conn } -func (s ClickHouseSuite) DestinationConnector() connectors.Connector { +func (s PeerFlowE2ETestSuiteCH) DestinationConnector() connectors.Connector { // TODO have CH connector return nil } -func (s ClickHouseSuite) Conn() *pgx.Conn { +func (s PeerFlowE2ETestSuiteCH) Conn() *pgx.Conn { return s.Connector().Conn() } -func (s ClickHouseSuite) Suffix() string { +func (s PeerFlowE2ETestSuiteCH) Suffix() string { return s.suffix } -func (s ClickHouseSuite) Peer() *protos.Peer { +func (s PeerFlowE2ETestSuiteCH) Peer() *protos.Peer { return s.PeerForDatabase("e2e_test_" + s.suffix) } -func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer { +func (s PeerFlowE2ETestSuiteCH) PeerForDatabase(dbname string) *protos.Peer { ret := &protos.Peer{ Name: e2e.AddSuffix(s, dbname), Type: protos.DBType_CLICKHOUSE, @@ -74,16 +74,16 @@ func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer { return ret } -func (s ClickHouseSuite) DestinationTable(table string) string { +func (s PeerFlowE2ETestSuiteCH) DestinationTable(table string) string { return table } -func (s ClickHouseSuite) Teardown() { +func (s PeerFlowE2ETestSuiteCH) Teardown() { require.NoError(s.t, s.s3Helper.CleanUp(context.Background())) e2e.TearDownPostgres(s) } -func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) { +func (s PeerFlowE2ETestSuiteCH) GetRows(table string, cols string) (*model.QRecordBatch, error) { ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig()) if err != nil { return nil, err @@ -145,7 +145,7 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch return batch, rows.Err() } -func SetupSuite(t *testing.T) ClickHouseSuite { +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteCH { t.Helper() suffix := "ch_" + strings.ToLower(shared.RandomString(8)) @@ -155,7 +155,7 @@ func SetupSuite(t *testing.T) ClickHouseSuite { s3Helper, err := e2e_s3.NewS3TestHelper(false) require.NoError(t, err, "failed to setup S3") - s := ClickHouseSuite{ + s := PeerFlowE2ETestSuiteCH{ t: t, conn: conn, suffix: suffix, diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go new file mode 100644 index 0000000000..33e2007bf0 --- /dev/null +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -0,0 +1,70 @@ +package e2e_clickhouse + +import ( + "context" + "fmt" + "testing" + + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" + "github.com/stretchr/testify/require" +) + +func TestPeerFlowE2ETestSuiteCH(t *testing.T) { + e2eshared.RunSuite(t, SetupSuite) +} + +func (s PeerFlowE2ETestSuiteCH) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) +} + +func (s PeerFlowE2ETestSuiteCH) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) +} + +func (s PeerFlowE2ETestSuiteCH) Test_Simple_CDC_CH() { + testName := "test_cdc_ch_simple" + srcTableName := s.attachSchemaSuffix(testName) + dstTableName := testName + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL NOT NULL, + key INTEGER NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(testName), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + Destination: s.Peer().Name, + } + + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + flowConnConfig.MaxBatchSize = 100 + flowConnConfig.SoftDeleteColName = "" + flowConnConfig.SyncedAtColName = "_peerdb_synced_at" + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + // insert 10 rows into the source table + for i := range 10 { + testKey := i + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.Conn().Exec(context.Background(), + fmt.Sprintf("INSERT INTO %s(key, value) VALUES ($1, $2, %s)", srcTableName), + testKey, testValue) + e2e.EnvNoError(s.t, env, err) + } + s.t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTables(env, s, "normalize inserts", dstTableName, "id,key,value") + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/clickhouse/qrep_flow_ch_test.go b/flow/e2e/clickhouse/qrep_flow_ch_test.go new file mode 100644 index 0000000000..7058c3bf73 --- /dev/null +++ b/flow/e2e/clickhouse/qrep_flow_ch_test.go @@ -0,0 +1,62 @@ +package e2e_clickhouse + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/stretchr/testify/require" +) + +func (s PeerFlowE2ETestSuiteCH) setupSimpleTable(tableName string) { + tblFields := []string{ + "id SERIAL PRIMARY KEY", + "key INTEGER NOT NULL", + "value TEXT NOT NULL", + "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP", + } + + tblFieldStr := strings.Join(tblFields, ",") + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE e2e_test_%s.%s ( + %s + );`, s.suffix, tableName, tblFieldStr)) + + require.NoError(s.t, err) + + var rows []string + row := `(1, 1, 'test_value_1', CURRENT_TIMESTAMP)` + rows = append(rows, row) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf( + `INSERT INTO e2e_test_%s.%s (id, key, value, updated_at) VALUES %s;`, + s.suffix, tableName, strings.Join(rows, ","))) + require.NoError(s.t, err) +} + +func (s PeerFlowE2ETestSuiteCH) Test_Append_QRep_CH() { + tc := e2e.NewTemporalClient(s.t) + + tblName := "test_append_ch_qrep" + s.setupSimpleTable(tblName) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.suffix, tblName) + + qrepConfig := e2e.CreateQRepWorkflowConfig(s.t, "test_append_ch_qrep", + fmt.Sprintf("e2e_test_%s.%s", s.suffix, tblName), + tblName, + query, + s.Peer().Name, + "", + true, + "_PEERDB_SYNCED_AT", + "") + env := e2e.RunQRepFlowWorkflow(tc, qrepConfig) + e2e.EnvWaitForFinished(s.t, env, 3*time.Minute) + require.NoError(s.t, env.Error()) + + e2e.RequireEqualTables(s, tblName, "*") +}