diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 8a8181ba61..dfd74c0554 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -215,4 +215,4 @@ services: - flow-api volumes: - pgdata: + pgdata: \ No newline at end of file diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 31d46b9a47..153fd0b8df 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -90,7 +90,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( DestinationTableIdentifier: strings.ToLower(rawTableIdentifier), } avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c) - destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier) + destinationTableSchema, err := c.GetTableSchema(qrepConfig.DestinationTableIdentifier) if err != nil { return nil, err } diff --git a/flow/connectors/clickhouse/client.go b/flow/connectors/clickhouse/client.go index 8bd5a0221e..47c524c217 100644 --- a/flow/connectors/clickhouse/client.go +++ b/flow/connectors/clickhouse/client.go @@ -28,7 +28,7 @@ func NewClickhouseClient(ctx context.Context, config *protos.ClickhouseConfig) ( } genericExecutor := *peersql.NewGenericSQLQueryExecutor( - ctx, database, clickhouseTypeToQValueKindMap, qvalue.QValueKindToSnowflakeTypeMap) + ctx, database, clickhouseTypeToQValueKindMap, qvalue.QValueKindToClickhouseTypeMap) return &ClickhouseClient{ GenericSQLQueryExecutor: genericExecutor, diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 50e1b74a46..c99d5a9c5f 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -24,6 +24,8 @@ func (c *ClickhouseConnector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { + + fmt.Printf("\n *********************............SyncQRepRecords 1") // Ensure the destination table is available. destTable := config.DestinationTableIdentifier flowLog := slog.Group("sync_metadata", @@ -41,7 +43,7 @@ func (c *ClickhouseConnector) SyncQRepRecords( return 0, nil } - tblSchema, err := c.getTableSchema(destTable) + tblSchema, err := c.GetTableSchema(destTable) if err != nil { return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err) } @@ -76,7 +78,7 @@ func (c *ClickhouseConnector) createMetadataInsertStatement( return insertMetadataStmt, nil } -func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) { +func (c *ClickhouseConnector) GetTableSchema(tableName string) ([]*sql.ColumnType, error) { //nolint:gosec queryString := fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName) //nolint:rowserrcheck diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index c32b23ecf7..9b79491c7e 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -119,6 +119,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( return 0, err } + fmt.Printf("\n*********************************............SyncQRepRecords 2 avroFile %+v\n", avroFile) + s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return 0, err diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 58e90b6894..0f25da07d9 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -107,6 +107,7 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem fields := make([]string, 0, len(schema.Fields)) for _, field := range schema.Fields { dbType, ok := g.qvalueKindToDBType[field.Type] + //fmt.Printf("\n***********fieldType: %s, dbType: %s\n", field.Type, dbType) if !ok { return fmt.Errorf("unsupported qvalue type %s", field.Type) } @@ -115,6 +116,9 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", schemaName, tableName, strings.Join(fields, ", ")) + if strings.Contains(tableName, "_ch_") { + command += " ENGINE = ReplacingMergeTree() ORDER BY id" + } _, err := g.db.ExecContext(g.ctx, command) if err != nil { return fmt.Errorf("failed to create table: %w", err) diff --git a/flow/e2e/clickhouse/ch.json b/flow/e2e/clickhouse/ch.json new file mode 100644 index 0000000000..ef1b88bed3 --- /dev/null +++ b/flow/e2e/clickhouse/ch.json @@ -0,0 +1,11 @@ +{ + "host": "localhost", + "port": 9000, + "user": "clickhouse", + "password": "clickhouse", + "database": "desti", + "s3_path": "s3://peerdb-test-bucket", + "access_key_id":"", + "secret_access_key": "", + "region": "us-east-2" +} \ No newline at end of file diff --git a/flow/e2e/clickhouse/clickhouse_helper.go b/flow/e2e/clickhouse/clickhouse_helper.go new file mode 100644 index 0000000000..419826cdaa --- /dev/null +++ b/flow/e2e/clickhouse/clickhouse_helper.go @@ -0,0 +1,201 @@ +package e2e_clickhouse + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + "os" + "time" + + connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" +) + +type ClickhouseTestHelper struct { + // config is the Clickhouse config. + Config *protos.ClickhouseConfig + // peer struct holder Clickhouse + Peer *protos.Peer + // connection to another database, to manage the test database + adminClient *connclickhouse.ClickhouseClient + // connection to the test database + testClient *connclickhouse.ClickhouseClient + // testSchemaName is the schema to use for testing. + testSchemaName string + // dbName is the database used for testing. + testDatabaseName string +} + +func NewClickhouseTestHelper() (*ClickhouseTestHelper, error) { + jsonPath := os.Getenv("TEST_CH_CREDS") + if jsonPath == "" { + return nil, fmt.Errorf("TEST_CH_CREDS env var not set") + } + + content, err := e2eshared.ReadFileToBytes(jsonPath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + var config *protos.ClickhouseConfig + err = json.Unmarshal(content, &config) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal json: %w", err) + } + + peer := generateCHPeer(config) + runID, err := shared.RandomUInt64() + if err != nil { + return nil, fmt.Errorf("failed to generate random uint64: %w", err) + } + + testDatabaseName := fmt.Sprintf("e2e_test_%d", runID) + + adminClient, err := connclickhouse.NewClickhouseClient(context.Background(), config) + if err != nil { + return nil, fmt.Errorf("failed to create Clickhouse client: %w", err) + } + err = adminClient.ExecuteQuery(fmt.Sprintf("CREATE DATABASE %s", testDatabaseName)) + if err != nil { + return nil, fmt.Errorf("failed to create Clickhouse test database: %w", err) + } + + config.Database = testDatabaseName + testClient, err := connclickhouse.NewClickhouseClient(context.Background(), config) + if err != nil { + return nil, fmt.Errorf("failed to create Clickhouse client: %w", err) + } + + return &ClickhouseTestHelper{ + Config: config, + Peer: peer, + adminClient: adminClient, + testClient: testClient, + //testSchemaName: "PUBLIC", + testSchemaName: testDatabaseName, + testDatabaseName: testDatabaseName, + }, nil +} + +func generateCHPeer(clickhouseConfig *protos.ClickhouseConfig) *protos.Peer { + ret := &protos.Peer{} + ret.Name = "test_ch_peer" + ret.Type = protos.DBType_CLICKHOUSE + + ret.Config = &protos.Peer_ClickhouseConfig{ + ClickhouseConfig: clickhouseConfig, + } + + return ret +} + +// Cleanup drops the database. +func (s *ClickhouseTestHelper) Cleanup() error { + err := s.testClient.Close() + if err != nil { + return err + } + err = s.adminClient.ExecuteQuery(fmt.Sprintf("DROP DATABASE %s", s.testDatabaseName)) + if err != nil { + return err + } + return s.adminClient.Close() +} + +// RunCommand runs the given command. +func (s *ClickhouseTestHelper) RunCommand(command string) error { + return s.testClient.ExecuteQuery(command) +} + +// CountRows(tableName) returns the number of rows in the given table. +func (s *ClickhouseTestHelper) CountRows(tableName string) (int, error) { + res, err := s.testClient.CountRows(s.testSchemaName, tableName) + if err != nil { + return 0, err + } + + return int(res), nil +} + +// CountRows(tableName) returns the non-null number of rows in the given table. +func (s *ClickhouseTestHelper) CountNonNullRows(tableName string, columnName string) (int, error) { + res, err := s.testClient.CountNonNullRows(s.testSchemaName, tableName, columnName) + if err != nil { + return 0, err + } + + return int(res), nil +} + +func (s *ClickhouseTestHelper) CheckNull(tableName string, colNames []string) (bool, error) { + return s.testClient.CheckNull(s.testSchemaName, tableName, colNames) +} + +func (s *ClickhouseTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecordBatch, error) { + return s.testClient.ExecuteAndProcessQuery(query) +} + +func (s *ClickhouseTestHelper) CreateTable(tableName string, schema *model.QRecordSchema) error { + return s.testClient.CreateTable(schema, s.testSchemaName, tableName) +} + +// runs a query that returns an int result +func (s *ClickhouseTestHelper) RunIntQuery(query string) (int, error) { + rows, err := s.testClient.ExecuteAndProcessQuery(query) + if err != nil { + return 0, err + } + + numRecords := 0 + if rows == nil || len(rows.Records) != 1 { + if rows != nil { + numRecords = len(rows.Records) + } + return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 rows", query, numRecords) + } + + rec := rows.Records[0] + if len(rec) != 1 { + return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, len(rec)) + } + + switch rec[0].Kind { + case qvalue.QValueKindInt32: + return int(rec[0].Value.(int32)), nil + case qvalue.QValueKindInt64: + return int(rec[0].Value.(int64)), nil + case qvalue.QValueKindNumeric: + // get big.Rat and convert to int + rat := rec[0].Value.(*big.Rat) + return int(rat.Num().Int64() / rat.Denom().Int64()), nil + default: + return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec[0].Kind) + } +} + +// runs a query that returns an int result +func (s *ClickhouseTestHelper) checkSyncedAt(query string) error { + recordBatch, err := s.testClient.ExecuteAndProcessQuery(query) + if err != nil { + return err + } + + for _, record := range recordBatch.Records { + for _, entry := range record { + if entry.Kind != qvalue.QValueKindTimestamp { + return fmt.Errorf("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp") + } + _, ok := entry.Value.(time.Time) + if !ok { + return fmt.Errorf("synced_at column failed: _PEERDB_SYNCED_AT is not valid") + } + } + } + + return nil +} diff --git a/flow/e2e/clickhouse/clickhouse_schema_delta_test.go b/flow/e2e/clickhouse/clickhouse_schema_delta_test.go new file mode 100644 index 0000000000..4a57409efd --- /dev/null +++ b/flow/e2e/clickhouse/clickhouse_schema_delta_test.go @@ -0,0 +1,307 @@ +package e2e_clickhouse + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" + "github.com/PeerDB-io/peer-flow/e2eshared" +) + +const schemaDeltaTestSchemaName = "PUBLIC" + +type ClickhouseSchemaDeltaTestSuite struct { + t *testing.T + + connector *connclickhouse.ClickhouseConnector + chTestHelper *ClickhouseTestHelper +} + +func setupSchemaDeltaSuite(t *testing.T) ClickhouseSchemaDeltaTestSuite { + t.Helper() + + chTestHelper, err := NewClickhouseTestHelper() + if err != nil { + t.Fatalf("Error in test: %v", err) + } + + connector, err := connclickhouse.NewClickhouseConnector( + context.Background(), + chTestHelper.Config, + ) + if err != nil { + t.Fatalf("Error in test: %v", err) + } + + return ClickhouseSchemaDeltaTestSuite{ + t: t, + connector: connector, + chTestHelper: chTestHelper, + } +} + +// func (s ClickhouseSchemaDeltaTestSuite) TestSimpleAddColumn() { +// tableName := fmt.Sprintf("%s.SIMPLE_ADD_COLUMN", schemaDeltaTestSchemaName) +// err := s.chTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) +// require.NoError(s.t, err) + +// err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ +// SrcTableName: tableName, +// DstTableName: tableName, +// AddedColumns: []*protos.DeltaAddedColumn{{ +// ColumnName: "HI", +// ColumnType: string(qvalue.QValueKindJSON), +// }}, +// }}) +// require.NoError(s.t, err) + +// output, err := s.connector.GetTableSchema(tableName) +// require.NoError(s.t, err) +// require.Equal(s.t, &protos.TableSchema{ +// TableIdentifier: tableName, +// Columns: []*protos.FieldDescription{ +// { +// Name: "ID", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "HI", +// Type: string(qvalue.QValueKindJSON), +// TypeModifier: -1, +// }, +// }, +// }, output.TableNameSchemaMapping[tableName]) +// } + +// func (s ClickhouseSchemaDeltaTestSuite) TestAddAllColumnTypes() { +// tableName := fmt.Sprintf("%s.ADD_DROP_ALL_COLUMN_TYPES", schemaDeltaTestSchemaName) +// err := s.chTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) +// require.NoError(s.t, err) + +// expectedTableSchema := &protos.TableSchema{ +// TableIdentifier: tableName, +// Columns: []*protos.FieldDescription{ +// { +// Name: "ID", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "C1", +// Type: string(qvalue.QValueKindBoolean), +// TypeModifier: -1, +// }, +// { +// Name: "C2", +// Type: string(qvalue.QValueKindBytes), +// TypeModifier: -1, +// }, +// { +// Name: "C3", +// Type: string(qvalue.QValueKindDate), +// TypeModifier: -1, +// }, +// { +// Name: "C4", +// Type: string(qvalue.QValueKindFloat64), +// TypeModifier: -1, +// }, +// { +// Name: "C5", +// Type: string(qvalue.QValueKindJSON), +// TypeModifier: -1, +// }, +// { +// Name: "C6", +// Type: string(qvalue.QValueKindNumeric), + +// TypeModifier: -1, +// }, +// { +// Name: "C7", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "C8", +// Type: string(qvalue.QValueKindTime), +// TypeModifier: -1, +// }, +// { +// Name: "C9", +// Type: string(qvalue.QValueKindTimestamp), +// TypeModifier: -1, +// }, +// { +// Name: "C10", +// Type: string(qvalue.QValueKindTimestampTZ), +// TypeModifier: -1, +// }, +// }, +// } +// addedColumns := make([]*protos.DeltaAddedColumn, 0) +// for _, column := range expectedTableSchema.Columns { +// if column.Name != "ID" { +// addedColumns = append(addedColumns, &protos.DeltaAddedColumn{ +// ColumnName: column.Name, +// ColumnType: column.Type, +// }) +// } +// } + +// err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ +// SrcTableName: tableName, +// DstTableName: tableName, +// AddedColumns: addedColumns, +// }}) +// require.NoError(s.t, err) + +// output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ +// TableIdentifiers: []string{tableName}, +// }) +// require.NoError(s.t, err) +// require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) +// } + +// func (s ClickhouseSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +// tableName := fmt.Sprintf("%s.ADD_DROP_TRICKY_COLUMN_NAMES", schemaDeltaTestSchemaName) +// err := s.chTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) +// require.NoError(s.t, err) + +// expectedTableSchema := &protos.TableSchema{ +// TableIdentifier: tableName, +// Columns: []*protos.FieldDescription{ +// { +// Name: "ID", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "C1", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "C 1", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "RIGHT", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "SELECT", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "XMIN", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "CARIÑO", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "±ªÞ³§", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: "カラム", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// }, +// } +// addedColumns := make([]*protos.DeltaAddedColumn, 0) +// for _, column := range expectedTableSchema.Columns { +// if column.Name != "ID" { +// addedColumns = append(addedColumns, &protos.DeltaAddedColumn{ +// ColumnName: column.Name, +// ColumnType: column.Type, +// }) +// } +// } + +// err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ +// SrcTableName: tableName, +// DstTableName: tableName, +// AddedColumns: addedColumns, +// }}) +// require.NoError(s.t, err) + +// output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ +// TableIdentifiers: []string{tableName}, +// }) +// require.NoError(s.t, err) +// require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) +// } + +// func (s ClickhouseSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { +// tableName := fmt.Sprintf("%s.ADD_DROP_WHITESPACE_COLUMN_NAMES", schemaDeltaTestSchemaName) +// err := s.chTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) +// require.NoError(s.t, err) + +// expectedTableSchema := &protos.TableSchema{ +// TableIdentifier: tableName, +// Columns: []*protos.FieldDescription{ +// { +// Name: " ", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: " ", +// Type: string(qvalue.QValueKindString), +// TypeModifier: -1, +// }, +// { +// Name: " ", +// Type: string(qvalue.QValueKindTime), +// TypeModifier: -1, +// }, +// { +// Name: "\t", +// Type: string(qvalue.QValueKindDate), +// TypeModifier: -1, +// }, +// }, +// } +// addedColumns := make([]*protos.DeltaAddedColumn, 0) +// for _, column := range expectedTableSchema.Columns { +// if column.Name != " " { +// addedColumns = append(addedColumns, &protos.DeltaAddedColumn{ +// ColumnName: column.Name, +// ColumnType: column.Type, +// }) +// } +// } + +// err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ +// SrcTableName: tableName, +// DstTableName: tableName, +// AddedColumns: addedColumns, +// }}) +// require.NoError(s.t, err) + +// output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ +// TableIdentifiers: []string{tableName}, +// }) +// require.NoError(s.t, err) +// require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) +// } + +func TestClickhouseSchemaDeltaTestSuite(t *testing.T) { + e2eshared.RunSuite(t, setupSchemaDeltaSuite, func(s ClickhouseSchemaDeltaTestSuite) { + require.NoError(s.t, s.chTestHelper.Cleanup()) + require.NoError(s.t, s.connector.Close()) + }) +} diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go new file mode 100644 index 0000000000..627a743f32 --- /dev/null +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -0,0 +1,1578 @@ +package e2e_clickhouse + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/joho/godotenv" + "github.com/stretchr/testify/require" + + connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuiteCH struct { + t *testing.T + + pgSuffix string + conn *pgx.Conn + chHelper *ClickhouseTestHelper + connector *connclickhouse.ClickhouseConnector +} + +func (s PeerFlowE2ETestSuiteCH) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteCH) Conn() *pgx.Conn { + return s.conn +} + +func (s PeerFlowE2ETestSuiteCH) Suffix() string { + return s.pgSuffix +} + +func (s PeerFlowE2ETestSuiteCH) GetRows(tableName string, chSelector string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf(`%s.%s`, s.chHelper.testDatabaseName, tableName) + chSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, chSelector, qualifiedTableName) + s.t.Logf("running query on clickhouse: %s", chSelQuery) + return s.chHelper.ExecuteAndProcessQuery(chSelQuery) +} + +func TestPeerFlowE2ETestSuiteCH(t *testing.T) { + e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteCH) { + e2e.TearDownPostgres(s) + + if s.chHelper != nil { + err := s.chHelper.Cleanup() + if err != nil { + s.t.Fatalf("failed to tear down Clickhouse: %v", err) + } + } + + err := s.connector.Close() + if err != nil { + s.t.Fatalf("failed to close Clickhouse connector: %v", err) + } + }) +} + +func (s PeerFlowE2ETestSuiteCH) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) +} + +func (s PeerFlowE2ETestSuiteCH) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.pgSuffix) +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteCH { + t.Helper() + + err := godotenv.Load() + if err != nil { + // it's okay if the .env file is not present + // we will use the default values + t.Log("Unable to load .env file, using default values from env") + } + + suffix := shared.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + pgSuffix := fmt.Sprintf("ch_%s_%s", strings.ToLower(suffix), tsSuffix) + + conn, err := e2e.SetupPostgres(t, pgSuffix) + if err != nil || conn == nil { + t.Fatalf("failed to setup Postgres: %v", err) + } + + chHelper, err := NewClickhouseTestHelper() + if err != nil { + t.Fatalf("failed to setup Clickhouse: %v", err) + } + + connector, err := connclickhouse.NewClickhouseConnector( + context.Background(), + chHelper.Config, + ) + require.NoError(t, err) + + suite := PeerFlowE2ETestSuiteCH{ + t: t, + pgSuffix: pgSuffix, + conn: conn, + chHelper: chHelper, + connector: connector, + } + + return suite +} + +// func (s PeerFlowE2ETestSuiteCH) Test_Complete_Simple_Flow_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// tableName := "test_simple_flow_ch" +// srcTableName := s.attachSchemaSuffix(tableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tableName) + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id SERIAL PRIMARY KEY, +// key TEXT NOT NULL, +// value TEXT NOT NULL +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix(tableName), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert 20 rows into the source table +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// // insert 20 rows into the source table +// for i := 0; i < 20; i++ { +// testKey := fmt.Sprintf("test_key_%d", i) +// testValue := fmt.Sprintf("test_value_%d", i) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s (key, value) VALUES ($1, $2) +// `, srcTableName), testKey, testValue) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 20 rows into the source table") +// e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,key,value") + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + +// // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago +// // it should match the count. +// newerSyncedAtQuery := fmt.Sprintf(` +// SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' +// `, dstTableName) +// numNewRows, err := s.chHelper.RunIntQuery(newerSyncedAtQuery) +// require.NoError(s.t, err) +// require.Equal(s.t, 20, numNewRows) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Flow_ReplicaIdentity_Index_No_Pkey() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// tableName := "test_replica_identity_no_pkey" +// srcTableName := s.attachSchemaSuffix(tableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_replica_identity_no_pkey") + +// // Create a table without a primary key and create a named unique index +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id SERIAL, +// key TEXT NOT NULL, +// value TEXT NOT NULL +// ); +// CREATE UNIQUE INDEX unique_idx_on_id_key ON %s (id, key); +// ALTER TABLE %s REPLICA IDENTITY USING INDEX unique_idx_on_id_key; +// `, srcTableName, srcTableName, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix(tableName), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: 20, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert 20 rows into the source table +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// // insert 20 rows into the source table +// for i := 0; i < 20; i++ { +// testKey := fmt.Sprintf("test_key_%d", i) +// testValue := fmt.Sprintf("test_value_%d", i) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s (id, key, value) VALUES ($1, $2, $3) +// `, srcTableName), i, testKey, testValue) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 20 rows into the source table") +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) +// err = env.GetWorkflowError() + +// // allow only continue as new error +// require.Contains(s.t, err.Error(), "continue as new") + +// count, err := s.chHelper.CountRows("test_replica_identity_no_pkey") +// require.NoError(s.t, err) +// require.Equal(s.t, 20, count) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Invalid_Geo_CH_Avro_CDC() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// tableName := "test_invalid_geo_ch_avro_cdc" +// srcTableName := s.attachSchemaSuffix(tableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_invalid_geo_ch_avro_cdc") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id SERIAL PRIMARY KEY, +// line GEOMETRY(LINESTRING) NOT NULL, +// poly GEOGRAPHY(POLYGON) NOT NULL +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix(tableName), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: 10, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert 10 rows into the source table +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// // insert 4 invalid shapes and 6 valid shapes into the source table +// for i := 0; i < 4; i++ { +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s (line,poly) VALUES ($1,$2) +// `, srcTableName), "010200000001000000000000000000F03F0000000000000040", +// "0103000020e6100000010000000c0000001a8361d35dc64140afdb8d2b1bc3c9bf1b8ed4685fc641405ba64c"+ +// "579dc2c9bf6a6ad95a5fc64140cd82767449c2c9bf9570fbf85ec641408a07944db9c2c9bf729a18a55ec6414021b8b748c7c2c9bfba46de4c"+ +// "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ +// "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", +// ) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 4 invalid geography rows into the source table") +// for i := 4; i < 10; i++ { +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s (line,poly) VALUES ($1,$2) +// `, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040", +// "010300000001000000050000000000000000000000000000000000000000000000"+ +// "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ +// "00f03f000000000000000000000000000000000000000000000000") +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 6 valid geography rows and 10 total rows into source") +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) +// err = env.GetWorkflowError() + +// // allow only continue as new error +// require.Contains(s.t, err.Error(), "continue as new") + +// // We inserted 4 invalid shapes in each. +// // They should have been filtered out as null on destination +// lineCount, err := s.chHelper.CountNonNullRows("test_invalid_geo_ch_avro_cdc", "line") +// require.NoError(s.t, err) +// require.Equal(s.t, 6, lineCount) + +// polyCount, err := s.chHelper.CountNonNullRows("test_invalid_geo_ch_avro_cdc", "poly") +// require.NoError(s.t, err) +// require.Equal(s.t, 6, polyCount) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Toast_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("test_toast_ch_1") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_toast_ch_1") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id SERIAL PRIMARY KEY, +// t1 text, +// t2 text, +// k int +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_toast_ch_1"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and execute a transaction touching toast columns +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// /* +// Executing a transaction which +// 1. changes both toast column +// 2. changes no toast column +// 2. changes 1 toast column +// */ +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// BEGIN; +// INSERT INTO %s (t1,t2,k) SELECT random_string(9000),random_string(9000), +// 1 FROM generate_series(1,2); +// UPDATE %s SET k=102 WHERE id=1; +// UPDATE %s SET t1='dummy' WHERE id=2; +// END; +// `, srcTableName, srcTableName, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Executed a transaction touching toast columns") +// e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_toast_ch_1", `id,t1,t2,k`) +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Toast_Advance_1_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("test_toast_ch_3") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_toast_ch_3") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id SERIAL PRIMARY KEY, +// t1 text, +// t2 text, +// k int +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_toast_ch_3"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and execute a transaction touching toast columns +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// // complex transaction with random DMLs on a table with toast columns +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// BEGIN; +// INSERT INTO %s (t1,t2,k) SELECT random_string(9000),random_string(9000), +// 1 FROM generate_series(1,2); +// UPDATE %s SET k=102 WHERE id=1; +// UPDATE %s SET t1='dummy' WHERE id=2; +// UPDATE %s SET t2='dummy' WHERE id=2; +// DELETE FROM %s WHERE id=1; +// INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000), +// 1 FROM generate_series(1,2); +// UPDATE %s SET k=1 WHERE id=1; +// UPDATE %s SET t1='dummy1',t2='dummy2' WHERE id=1; +// UPDATE %s SET t1='dummy3' WHERE id=3; +// DELETE FROM %s WHERE id=2; +// DELETE FROM %s WHERE id=3; +// DELETE FROM %s WHERE id=2; +// END; +// `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, +// srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Executed a transaction touching toast columns") +// e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_toast_ch_3", `id,t1,t2,k`) +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Toast_Advance_2_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("test_toast_ch_4") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_toast_ch_4") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id SERIAL PRIMARY KEY, +// t1 text, +// k int +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_toast_ch_4"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and execute a transaction touching toast columns +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// // complex transaction with random DMLs on a table with toast columns +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// BEGIN; +// INSERT INTO %s (t1,k) SELECT random_string(9000), +// 1 FROM generate_series(1,1); +// UPDATE %s SET t1=sub.t1 FROM (SELECT random_string(9000) t1 +// FROM generate_series(1,1) ) sub WHERE id=1; +// UPDATE %s SET k=2 WHERE id=1; +// UPDATE %s SET k=3 WHERE id=1; +// UPDATE %s SET t1=sub.t1 FROM (SELECT random_string(9000) t1 +// FROM generate_series(1,1)) sub WHERE id=1; +// UPDATE %s SET k=4 WHERE id=1; +// END; +// `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Executed a transaction touching toast columns") +// e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_toast_ch_4", `id,t1,k`) +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Toast_Advance_3_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("test_toast_ch_5") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_toast_ch_5") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id SERIAL PRIMARY KEY, +// t1 text, +// t2 text, +// k int +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_toast_ch_5"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and execute a transaction touching toast columns +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// /* +// transaction updating a single row +// multiple times with changed/unchanged toast columns +// */ +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// BEGIN; +// INSERT INTO %s (t1,t2,k) SELECT random_string(9000),random_string(9000), +// 1 FROM generate_series(1,1); +// UPDATE %s SET k=102 WHERE id=1; +// UPDATE %s SET t1='dummy' WHERE id=1; +// UPDATE %s SET t2='dummy' WHERE id=1; +// END; +// `, srcTableName, srcTableName, srcTableName, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Executed a transaction touching toast columns") + +// e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_toast_ch_5", `id,t1,t2,k`) +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Types_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("test_types_ch") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_types_ch") +// createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" +// var pgErr *pgconn.PgError +// _, enumErr := s.conn.Exec(context.Background(), createMoodEnum) +// if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(pgErr) { +// require.NoError(s.t, enumErr) +// } +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, +// c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, +// c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, +// c23 NUMERIC(16,5),c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, +// c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, +// c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), +// c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON), +// c48 mood, c49 HSTORE, c50 DATE[], c51 TIMESTAMPTZ[], c52 TIMESTAMP[], c53 BOOLEAN[],c54 SMALLINT[]); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_types_ch"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: 1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and execute a transaction touching toast columns +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// /* test inserting various types*/ +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s SELECT 2,2,b'1',b'101', +// true,random_bytea(32),'s','test','1.1.10.2'::cidr, +// CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, +// '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, +// '{"sai":1}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, +// 1.2,100.24553,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, +// 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, +// txid_current_snapshot(), +// '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), +// 'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', +// 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', +// 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))', 'happy','"a"=>"a\"quote\"", "b"=>NULL', +// '{2020-01-01, 2020-01-02}'::date[], +// '{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[], +// '{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[], +// '{true, false}'::boolean[], +// '{1,2}'::smallint[]; +// `, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) +// err = env.GetWorkflowError() + +// // allow only continue as new error +// require.Contains(s.t, err.Error(), "continue as new") + +// noNulls, err := s.chHelper.CheckNull("test_types_ch", []string{ +// "c41", "c1", "c2", "c3", "c4", +// "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", +// "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", +// "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49", +// "c50", "c51", "c52", "c53", "c54", +// }) +// if err != nil { +// s.t.Log(err) +// } + +// // check if JSON on clickhouse side is a good JSON +// err = s.checkJSONValue(dstTableName, "c17", "sai", "1") +// require.NoError(s.t, err) + +// // check if HSTORE on clickhouse is a good JSON +// err = s.checkJSONValue(dstTableName, "c49", "a", `"a\"quote\""`) +// require.NoError(s.t, err) + +// err = s.checkJSONValue(dstTableName, "c49", "b", "null") +// require.NoError(s.t, err) + +// // Make sure that there are no nulls +// require.True(s.t, noNulls) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Multi_Table_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTable1Name := s.attachSchemaSuffix("test1_ch") +// srcTable2Name := s.attachSchemaSuffix("test2_ch") +// dstTable1Name := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test1_ch") +// dstTable2Name := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test2_ch") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); +// CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); +// `, srcTable1Name, srcTable2Name)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_multi_table"), +// TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: 2, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and execute a transaction touching toast columns +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// /* inserting across multiple tables*/ +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); +// INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); +// `, srcTable1Name, srcTable2Name)) +// e2e.EnvNoError(s.t, env, err) +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) +// err = env.GetWorkflowError() + +// count1, err := s.chHelper.CountRows("test1_ch") +// require.NoError(s.t, err) +// count2, err := s.chHelper.CountRows("test2_ch") +// require.NoError(s.t, err) + +// require.Equal(s.t, 1, count1) +// require.Equal(s.t, 1, count2) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Simple_Schema_Changes_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_simple_schema_changes") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, +// c1 BIGINT +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_simple_schema_changes"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert and mutate schema repeatedly. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Inserted initial row in the source table") + +// e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", "test_simple_schema_changes", "id,c1") + +// expectedTableSchema := &protos.TableSchema{ +// TableIdentifier: strings.ToUpper(dstTableName), +// Columns: []*protos.FieldDescription{ +// { +// Name: "ID", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "C1", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "_PEERDB_IS_DELETED", +// Type: string(qvalue.QValueKindBoolean), +// TypeModifier: -1, +// }, +// { +// Name: "_PEERDB_SYNCED_AT", +// Type: string(qvalue.QValueKindTimestamp), +// TypeModifier: -1, +// }, +// }, +// } +// output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ +// TableIdentifiers: []string{dstTableName}, +// }) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + +// // alter source table, add column c2 and insert another row. +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Altered source table, added column c2") +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Inserted row with added c2 in the source table") + +// // verify we got our two rows, if schema did not match up it will error. +// e2e.EnvWaitForEqualTables(env, s, "normalize altered row", "test_simple_schema_changes", "id,c1,c2") +// expectedTableSchema = &protos.TableSchema{ +// TableIdentifier: strings.ToUpper(dstTableName), +// Columns: []*protos.FieldDescription{ +// { +// Name: "ID", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "C1", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "_PEERDB_SYNCED_AT", +// Type: string(qvalue.QValueKindTimestamp), +// TypeModifier: -1, +// }, +// { +// Name: "C2", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// }, +// } +// output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ +// TableIdentifiers: []string{dstTableName}, +// }) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) +// e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2") + +// // alter source table, add column c3, drop column c2 and insert another row. +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Altered source table, dropped column c2 and added column c3") +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Inserted row with added c3 in the source table") + +// // verify we got our two rows, if schema did not match up it will error. +// e2e.EnvWaitForEqualTables(env, s, "normalize dropped c2 column", "test_simple_schema_changes", "id,c1,c3") +// expectedTableSchema = &protos.TableSchema{ +// TableIdentifier: strings.ToUpper(dstTableName), +// Columns: []*protos.FieldDescription{ +// { +// Name: "ID", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "C1", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "_PEERDB_SYNCED_AT", +// Type: string(qvalue.QValueKindTimestamp), +// TypeModifier: -1, +// }, +// { +// Name: "C2", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "C3", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// }, +// } +// output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ +// TableIdentifiers: []string{dstTableName}, +// }) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) +// e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3") + +// // alter source table, drop column c3 and insert another row. +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// ALTER TABLE %s DROP COLUMN c3`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Altered source table, dropped column c3") +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) +// e2e.EnvNoError(s.t, env, err) +// s.t.Log("Inserted row after dropping all columns in the source table") + +// // verify we got our two rows, if schema did not match up it will error. +// e2e.EnvWaitForEqualTables(env, s, "normalize dropped c3 column", "test_simple_schema_changes", "id,c1") +// expectedTableSchema = &protos.TableSchema{ +// TableIdentifier: strings.ToUpper(dstTableName), +// Columns: []*protos.FieldDescription{ +// { +// Name: "ID", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "C1", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "_PEERDB_SYNCED_AT", +// Type: string(qvalue.QValueKindTimestamp), +// TypeModifier: -1, +// }, +// { +// Name: "C2", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// { +// Name: "C3", +// Type: string(qvalue.QValueKindNumeric), +// TypeModifier: -1, +// }, +// }, +// } +// output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ +// TableIdentifiers: []string{dstTableName}, +// }) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) +// e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Composite_PKey_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("test_simple_cpkey") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_simple_cpkey") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT GENERATED ALWAYS AS IDENTITY, +// c1 INT GENERATED BY DEFAULT AS IDENTITY, +// c2 INT, +// t TEXT, +// PRIMARY KEY(id,t) +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_cpkey_flow"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert, update and delete rows in the table. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// // insert 10 rows into the source table +// for i := 0; i < 10; i++ { +// testValue := fmt.Sprintf("test_value_%d", i) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c2,t) VALUES ($1,$2) +// `, srcTableName), i, testValue) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 10 rows into the source table") + +// e2e.EnvWaitForEqualTables(env, s, "normalize table", "test_simple_cpkey", "id,c1,c2,t") + +// _, err := s.conn.Exec(context.Background(), +// fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) +// e2e.EnvNoError(s.t, env, err) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", "test_simple_cpkey", "id,c1,c2,t") + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Composite_PKey_Toast_1_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_cpkey_toast1") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT GENERATED ALWAYS AS IDENTITY, +// c1 INT GENERATED BY DEFAULT AS IDENTITY, +// c2 INT, +// t TEXT, +// t2 TEXT, +// PRIMARY KEY(id,t) +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert, update and delete rows in the table. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// rowsTx, err := s.conn.Begin(context.Background()) +// e2e.EnvNoError(s.t, env, err) + +// // insert 10 rows into the source table +// for i := 0; i < 10; i++ { +// testValue := fmt.Sprintf("test_value_%d", i) +// _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) +// `, srcTableName), i, testValue) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 10 rows into the source table") + +// _, err = rowsTx.Exec(context.Background(), +// fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) +// e2e.EnvNoError(s.t, env, err) +// _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) +// e2e.EnvNoError(s.t, env, err) + +// err = rowsTx.Commit(context.Background()) +// e2e.EnvNoError(s.t, env, err) + +// e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_cpkey_toast1", "id,c1,c2,t,t2") +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Composite_PKey_Toast_2_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// tableName := "test_cpkey_toast2" +// srcTableName := s.attachSchemaSuffix(tableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tableName) + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT GENERATED ALWAYS AS IDENTITY, +// c1 INT GENERATED BY DEFAULT AS IDENTITY, +// c2 INT, +// t TEXT, +// t2 TEXT, +// PRIMARY KEY(id,t) +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix(tableName), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert, update and delete rows in the table. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + +// // insert 10 rows into the source table +// for i := 0; i < 10; i++ { +// testValue := fmt.Sprintf("test_value_%d", i) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) +// `, srcTableName), i, testValue) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 10 rows into the source table") + +// e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c2,t,t2") +// _, err = s.conn.Exec(context.Background(), +// fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) +// e2e.EnvNoError(s.t, env, err) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c2,t,t2") + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Column_Exclusion() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// tableName := "test_exclude_ch" +// srcTableName := s.attachSchemaSuffix(tableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tableName) + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT GENERATED ALWAYS AS IDENTITY, +// c1 INT GENERATED BY DEFAULT AS IDENTITY, +// c2 INT, +// t TEXT, +// t2 TEXT, +// PRIMARY KEY(id,t) +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix(tableName), +// } + +// config := &protos.FlowConnectionConfigs{ +// FlowJobName: connectionGen.FlowJobName, +// Destination: s.chHelper.Peer, +// TableMappings: []*protos.TableMapping{ +// { +// SourceTableIdentifier: srcTableName, +// DestinationTableIdentifier: dstTableName, +// Exclude: []string{"c2"}, +// }, +// }, +// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), +// CdcStagingPath: connectionGen.CdcStagingPath, +// SyncedAtColName: "_PEERDB_SYNCED_AT", +// } + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert, update and delete rows in the table. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + +// // insert 10 rows into the source table +// for i := 0; i < 10; i++ { +// testValue := fmt.Sprintf("test_value_%d", i) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) +// `, srcTableName), i, testValue) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 10 rows into the source table") + +// e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1,t,t2") +// _, err = s.conn.Exec(context.Background(), +// fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=0`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c1,t,t2") + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + +// chRows, err := s.GetRows(tableName, "*") +// require.NoError(s.t, err) + +// for _, field := range chRows.Schema.Fields { +// require.NotEqual(s.t, "c2", field.Name) +// } +// require.Len(s.t, chRows.Schema.Fields, 5) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Soft_Delete_Basic() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// tableName := "test_softdel_src" +// dstName := "test_softdel" +// srcTableName := s.attachSchemaSuffix(tableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, dstName) + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, +// c1 INT, +// c2 INT, +// t TEXT +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix(dstName), +// } + +// config := &protos.FlowConnectionConfigs{ +// FlowJobName: connectionGen.FlowJobName, +// Destination: s.chHelper.Peer, +// TableMappings: []*protos.TableMapping{ +// { +// SourceTableIdentifier: srcTableName, +// DestinationTableIdentifier: dstTableName, +// }, +// }, +// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), +// CdcStagingPath: connectionGen.CdcStagingPath, +// SoftDelete: true, +// SoftDeleteColName: "_PEERDB_IS_DELETED", +// SyncedAtColName: "_PEERDB_SYNCED_AT", +// } + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert, update and delete rows in the table. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize row", tableName, dstName, "id,c1,c2,t") +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", tableName, dstName, "id,c1,c2,t") +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// DELETE FROM %s WHERE id=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTablesWithNames( +// env, +// s, +// "normalize delete", +// tableName, +// dstName+" WHERE NOT _PEERDB_IS_DELETED", +// "id,c1,c2,t", +// ) + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + +// newerSyncedAtQuery := fmt.Sprintf(` +// SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) +// numNewRows, err := s.chHelper.RunIntQuery(newerSyncedAtQuery) +// require.NoError(s.t, err) +// require.Equal(s.t, 1, numNewRows) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Soft_Delete_IUD_Same_Batch() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// cmpTableName := s.attachSchemaSuffix("test_softdel_iud") +// srcTableName := fmt.Sprintf("%s_src", cmpTableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "test_softdel_iud") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, +// c1 INT, +// c2 INT, +// t TEXT +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_softdel_iud"), +// } + +// config := &protos.FlowConnectionConfigs{ +// FlowJobName: connectionGen.FlowJobName, +// Destination: s.chHelper.Peer, +// TableMappings: []*protos.TableMapping{ +// { +// SourceTableIdentifier: srcTableName, +// DestinationTableIdentifier: dstTableName, +// }, +// }, +// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), +// CdcStagingPath: connectionGen.CdcStagingPath, +// SoftDelete: true, +// SoftDeleteColName: "_PEERDB_IS_DELETED", +// SyncedAtColName: "_PEERDB_SYNCED_AT", +// } + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert, update and delete rows in the table. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + +// insertTx, err := s.conn.Begin(context.Background()) +// e2e.EnvNoError(s.t, env, err) + +// _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` +// UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// // since we delete stuff, create another table to compare with +// _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` +// DELETE FROM %s WHERE id=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) + +// e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) + +// e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_softdel_iud", "id,c1,c2,t") +// e2e.EnvWaitFor(s.t, env, 3*time.Minute, "checking soft delete", func() bool { +// newerSyncedAtQuery := fmt.Sprintf(` +// SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) +// numNewRows, err := s.chHelper.RunIntQuery(newerSyncedAtQuery) +// e2e.EnvNoError(s.t, env, err) +// return numNewRows == 1 +// }) + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Soft_Delete_UD_Same_Batch() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// tableName := "test_softdel_ud_src" +// dstName := "test_softdel_ud" +// srcTableName := s.attachSchemaSuffix(tableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, dstName) + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, +// c1 INT, +// c2 INT, +// t TEXT +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix(dstName), +// } + +// config := &protos.FlowConnectionConfigs{ +// FlowJobName: connectionGen.FlowJobName, +// Destination: s.chHelper.Peer, +// TableMappings: []*protos.TableMapping{ +// { +// SourceTableIdentifier: srcTableName, +// DestinationTableIdentifier: dstTableName, +// }, +// }, +// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), +// CdcStagingPath: connectionGen.CdcStagingPath, +// SoftDelete: true, +// SoftDeleteColName: "_PEERDB_IS_DELETED", +// SyncedAtColName: "_PEERDB_SYNCED_AT", +// } + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert, update and delete rows in the table. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", tableName, dstName, "id,c1,c2,t") + +// insertTx, err := s.conn.Begin(context.Background()) +// e2e.EnvNoError(s.t, env, err) +// _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` +// UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` +// UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` +// DELETE FROM %s WHERE id=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) + +// e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) +// e2e.EnvWaitForEqualTablesWithNames( +// env, +// s, +// "normalize transaction", +// tableName, +// dstName+" WHERE NOT _PEERDB_IS_DELETED", +// "id,c1,c2,t", +// ) +// e2e.EnvWaitFor(s.t, env, 3*time.Minute, "checking soft delete", func() bool { +// newerSyncedAtQuery := fmt.Sprintf(` +// SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) +// numNewRows, err := s.chHelper.RunIntQuery(newerSyncedAtQuery) +// e2e.EnvNoError(s.t, env, err) +// return numNewRows == 1 +// }) + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Soft_Delete_Insert_After_Delete() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// tableName := "test_softdel_iad" +// srcTableName := s.attachSchemaSuffix(tableName) +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tableName) + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, +// c1 INT, +// c2 INT, +// t TEXT +// ); +// `, srcTableName)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix(tableName), +// } + +// config := &protos.FlowConnectionConfigs{ +// FlowJobName: connectionGen.FlowJobName, +// Destination: s.chHelper.Peer, +// TableMappings: []*protos.TableMapping{ +// { +// SourceTableIdentifier: srcTableName, +// DestinationTableIdentifier: dstTableName, +// }, +// }, +// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), +// CdcStagingPath: connectionGen.CdcStagingPath, +// SoftDelete: true, +// SoftDeleteColName: "_PEERDB_IS_DELETED", +// SyncedAtColName: "_PEERDB_SYNCED_AT", +// } + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert and delete rows in the table. +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTables(env, s, "normalize row", tableName, "id,c1,c2,t") +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// DELETE FROM %s WHERE id=1`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTablesWithNames( +// env, +// s, +// "normalize delete", +// tableName, +// tableName+" WHERE NOT _PEERDB_IS_DELETED", +// "id,c1,c2,t", +// ) + +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) +// e2e.EnvNoError(s.t, env, err) +// e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", tableName, "id,c1,c2,t") + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + +// newerSyncedAtQuery := fmt.Sprintf(` +// SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) +// numNewRows, err := s.chHelper.RunIntQuery(newerSyncedAtQuery) +// require.NoError(s.t, err) +// require.Equal(s.t, 0, numNewRows) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Supported_Mixed_Case_Table_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// srcTableName := s.attachSchemaSuffix("testMixedCase") +// dstTableName := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, "testMixedCase") + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" ( +// "pulseArmor" SERIAL PRIMARY KEY, +// "highGold" TEXT NOT NULL, +// "eVe" TEXT NOT NULL, +// id SERIAL +// ); +// `, s.pgSuffix, "testMixedCase")) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_mixed_case"), +// TableNameMapping: map[string]string{srcTableName: dstTableName}, +// PostgresPort: e2e.PostgresPort, +// Destination: s.chHelper.Peer, +// } + +// flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + +// limits := peerflow.CDCFlowLimits{ +// ExitAfterRecords: -1, +// MaxBatchSize: 100, +// } + +// // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup +// // and then insert 20 rows into the source table +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// // insert 20 rows into the source table +// for i := 0; i < 20; i++ { +// testKey := fmt.Sprintf("test_key_%d", i) +// testValue := fmt.Sprintf("test_value_%d", i) +// _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO e2e_test_%s."%s"("highGold","eVe") VALUES ($1, $2) +// `, s.pgSuffix, "testMixedCase"), testKey, testValue) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Log("Inserted 20 rows into the source table") +// e2e.EnvWaitForEqualTablesWithNames( +// env, +// s, +// "normalize mixed case", +// "testMixedCase", +// "\"testMixedCase\"", +// "id,\"pulseArmor\",\"highGold\",\"eVe\"", +// ) + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) +// } diff --git a/flow/e2e/clickhouse/qrep_flow_ch_test.go b/flow/e2e/clickhouse/qrep_flow_ch_test.go new file mode 100644 index 0000000000..b4bb68f2f8 --- /dev/null +++ b/flow/e2e/clickhouse/qrep_flow_ch_test.go @@ -0,0 +1,317 @@ +package e2e_clickhouse + +import ( + "fmt" + "log/slog" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/e2e" +) + +//nolint:unparam +func (s PeerFlowE2ETestSuiteCH) setupSourceTable(tableName string, numRows int, fieldNames []string) { + err := e2e.CreateTableForQRepNew(s.conn, s.pgSuffix, tableName, fieldNames) + fmt.Printf("\n**************************** setupSourceTable created") + require.NoError(s.t, err) + err = e2e.PopulateSourceTableNew(s.conn, s.pgSuffix, tableName, numRows, fieldNames) + require.NoError(s.t, err) +} + +func (s PeerFlowE2ETestSuiteCH) setupCHDestinationTable(dstTable string, fieldNames []string) { + schema := e2e.GetOwnersSchemaNew(fieldNames) + //TODO: write your own table creation logic for ch or modify the one in chHelper() + err := s.chHelper.CreateTable(dstTable, schema) + // fail if table creation fails + if err != nil { + require.FailNow(s.t, "unable to create table on clickhouse", err) + } + + slog.Info(fmt.Sprintf("created table on clickhouse: %s.%s.", s.chHelper.testDatabaseName, dstTable)) +} + +// func (s PeerFlowE2ETestSuiteCH) checkJSONValue(tableName, colName, fieldName, value string) error { +// res, err := s.chHelper.ExecuteAndProcessQuery(fmt.Sprintf("SELECT %s:%s FROM %s", colName, fieldName, tableName)) +// if err != nil { +// return fmt.Errorf("json value check failed: %v", err) +// } + +// if len(res.Records) == 0 { +// return fmt.Errorf("bad json: empty result set from %s", tableName) +// } + +// jsonVal := res.Records[0][0].Value +// if jsonVal != value { +// return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value) +// } +// return nil +// } + +func (s PeerFlowE2ETestSuiteCH) compareTableContentsWithDiffSelectorsCH(tableName, pgSelector, chSelector string) { + pgRows, err := e2e.GetPgRows(s.conn, s.pgSuffix, tableName, pgSelector) + require.NoError(s.t, err) + + chRows, err := s.GetRows(tableName, chSelector) + require.NoError(s.t, err) + + e2e.RequireEqualRecordBatches(s.t, pgRows, chRows) +} + +// func (s PeerFlowE2ETestSuiteCH) Test_Complete_QRep_Flow_Avro_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// numRows := 10 + +// tblName := "test_qrep_flow_avro_ch" +// s.setupSourceTable(tblName, numRows) + +// dstSchemaQualified := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tblName) + +// query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", +// s.pgSuffix, tblName) + +// qrepConfig, err := e2e.CreateQRepWorkflowConfig( +// "test_qrep_flow_avro_ch", +// fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), +// dstSchemaQualified, +// query, +// s.chHelper.Peer, +// "", +// false, +// "", +// ) +// qrepConfig.SetupWatermarkTableOnDestination = true +// require.NoError(s.t, err) + +// e2e.RunQrepFlowWorkflow(env, qrepConfig) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) + +// err = env.GetWorkflowError() +// require.NoError(s.t, err) + +// sel := e2e.GetOwnersSelectorStringsCH() +// s.compareTableContentsWithDiffSelectorsCH(tblName, sel[0], sel[1]) + +// err = s.checkJSONValue(dstSchemaQualified, "f7", "key", "\"value\"") +// require.NoError(s.t, err) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Complete_QRep_Flow_Avro_CH_Upsert_Simple() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// numRows := 10 + +// tblName := "test_qrep_flow_avro_ch_ups" +// s.setupSourceTable(tblName, numRows) + +// dstSchemaQualified := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tblName) + +// query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", +// s.pgSuffix, tblName) + +// qrepConfig, err := e2e.CreateQRepWorkflowConfig( +// "test_qrep_flow_avro_ch", +// fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), +// dstSchemaQualified, +// query, +// s.chHelper.Peer, +// "", +// false, +// "", +// ) +// qrepConfig.WriteMode = &protos.QRepWriteMode{ +// WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, +// UpsertKeyColumns: []string{"id"}, +// } +// qrepConfig.SetupWatermarkTableOnDestination = true +// require.NoError(s.t, err) + +// e2e.RunQrepFlowWorkflow(env, qrepConfig) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) + +// err = env.GetWorkflowError() +// require.NoError(s.t, err) + +// sel := e2e.GetOwnersSelectorStringsCH() +// s.compareTableContentsWithDiffSelectorsCH(tblName, sel[0], sel[1]) +// } + +func (s PeerFlowE2ETestSuiteCH) Test_Complete_QRep_Flow_Avro_CH_S3() { + fieldNames := []string{"id", "ownerable_type", "created_at", "updated_at"} + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3") + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + + numRows := 10 + + tblName := "test_qrep_flow_avro_ch_s3" + s.setupSourceTable(tblName, numRows, fieldNames) + s.setupCHDestinationTable(tblName, fieldNames) //As currently qrep doesnot create destination table on its own + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 1 setupSourceTable done\n") + + dstSchemaQualified := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tblName) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.pgSuffix, tblName) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_qrep_flow_avro_ch", + s.attachSchemaSuffix(tblName), + dstSchemaQualified, + query, + s.chHelper.Peer, + "", + false, + "", + ) + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 2\n") + require.NoError(s.t, err) + qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) + qrepConfig.SetupWatermarkTableOnDestination = true + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 3\n") + e2e.RunQrepFlowWorkflow(env, qrepConfig) + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 4\n") + // Verify workflow completes without error + require.True(s.t, env.IsWorkflowCompleted()) + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 5\n") + err = env.GetWorkflowError() + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 5.5\n %+v", err) + require.NoError(s.t, err) + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 6\n") + sel := e2e.GetOwnersSelectorStringsCH(fieldNames) + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 7\n") + s.compareTableContentsWithDiffSelectorsCH(tblName, sel[0], sel[1]) + fmt.Printf("\n*********************************............Test_Complete_QRep_Flow_Avro_CH_S3 8\n") +} + +// func (s PeerFlowE2ETestSuiteCH) Test_Complete_QRep_Flow_Avro_CH_Upsert_XMIN() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// numRows := 10 + +// tblName := "test_qrep_flow_avro_ch_ups_xmin" +// s.setupSourceTable(tblName, numRows) + +// dstSchemaQualified := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tblName) + +// query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s", +// s.pgSuffix, tblName) + +// qrepConfig, err := e2e.CreateQRepWorkflowConfig( +// "test_qrep_flow_avro_ch_xmin", +// fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), +// dstSchemaQualified, +// query, +// s.chHelper.Peer, +// "", +// false, +// "", +// ) +// qrepConfig.WriteMode = &protos.QRepWriteMode{ +// WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, +// UpsertKeyColumns: []string{"id"}, +// } +// qrepConfig.WatermarkColumn = "xmin" +// qrepConfig.SetupWatermarkTableOnDestination = true +// require.NoError(s.t, err) + +// e2e.RunXminFlowWorkflow(env, qrepConfig) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) + +// err = env.GetWorkflowError() +// require.NoError(s.t, err) + +// sel := e2e.GetOwnersSelectorStringsCH() +// s.compareTableContentsWithDiffSelectorsCH(tblName, sel[0], sel[1]) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_Complete_QRep_Flow_Avro_CH_S3_Integration() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// numRows := 10 + +// tblName := "test_qrep_flow_avro_ch_s3_int" +// s.setupSourceTable(tblName, numRows) + +// dstSchemaQualified := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tblName) + +// query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", +// s.pgSuffix, tblName) + +// chPeer := s.chHelper.Peer +// chPeer.GetClickhouseConfig().S3Integration = "peerdb_s3_integration" + +// qrepConfig, err := e2e.CreateQRepWorkflowConfig( +// "test_qrep_flow_avro_ch_int", +// s.attachSchemaSuffix(tblName), +// dstSchemaQualified, +// query, +// chPeer, +// "", +// false, +// "", +// ) +// require.NoError(s.t, err) +// qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) +// qrepConfig.SetupWatermarkTableOnDestination = true + +// e2e.RunQrepFlowWorkflow(env, qrepConfig) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) + +// err = env.GetWorkflowError() +// require.NoError(s.t, err) + +// sel := e2e.GetOwnersSelectorStringsCH() +// s.compareTableContentsWithDiffSelectorsCH(tblName, sel[0], sel[1]) +// } + +// func (s PeerFlowE2ETestSuiteCH) Test_PeerDB_Columns_QRep_CH() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + +// numRows := 10 + +// tblName := "test_qrep_columns_ch" +// s.setupSourceTable(tblName, numRows) + +// dstSchemaQualified := fmt.Sprintf("%s.%s", s.chHelper.testSchemaName, tblName) + +// query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", +// s.pgSuffix, tblName) + +// qrepConfig, err := e2e.CreateQRepWorkflowConfig( +// "test_columns_qrep_ch", +// fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), +// dstSchemaQualified, +// query, +// s.chHelper.Peer, +// "", +// true, +// "_PEERDB_SYNCED_AT", +// ) +// qrepConfig.WriteMode = &protos.QRepWriteMode{ +// WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, +// UpsertKeyColumns: []string{"id"}, +// } +// qrepConfig.SetupWatermarkTableOnDestination = true +// require.NoError(s.t, err) + +// e2e.RunQrepFlowWorkflow(env, qrepConfig) + +// // Verify workflow completes without error +// require.True(s.t, env.IsWorkflowCompleted()) + +// err = env.GetWorkflowError() +// require.NoError(s.t, err) + +// err = s.chHelper.checkSyncedAt(fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s.%s`, +// s.chHelper.testSchemaName, tblName)) +// require.NoError(s.t, err) +// } diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 73450d130d..a3a3e52631 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -19,7 +19,7 @@ const ( postgresUser = "postgres" postgresPassword = "postgres" postgresDatabase = "postgres" - PostgresPort = 7132 + PostgresPort = 9999 ) func GetTestPostgresConf() *protos.PostgresConfig { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index e412335a15..9a1216ab74 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -209,9 +209,290 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen } } -func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error { +func CreateTableForQRepNew(conn *pgx.Conn, suffix string, tableName string, fieldNames []string) error { createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" + // Mapping of field names to field types + fieldTypes := map[string]string{ + "id": "UUID NOT NULL PRIMARY KEY", + "card_id": "UUID", + "from": "TIMESTAMP NOT NULL", + "price": "NUMERIC", + "created_at": "TIMESTAMP NOT NULL", + "updated_at": "TIMESTAMP NOT NULL", + "transaction_hash": "BYTEA", + "ownerable_type": "VARCHAR", + "ownerable_id": "UUID", + "user_nonce": "INTEGER", + "transfer_type": "INTEGER DEFAULT 0 NOT NULL", + "blockchain": "INTEGER NOT NULL", + "deal_type": "VARCHAR", + "deal_id": "UUID", + "ethereum_transaction_id": "UUID", + "ignore_price": "BOOLEAN DEFAULT false", + "card_eth_value": "DOUBLE PRECISION", + "paid_eth_price": "DOUBLE PRECISION", + "card_bought_notified": "BOOLEAN DEFAULT false NOT NULL", + "address": "NUMERIC(20,8)", + "account_id": "UUID", + "asset_id": "NUMERIC NOT NULL", + "status": "INTEGER", + "transaction_id": "UUID", + "settled_at": "TIMESTAMP", + "reference_id": "VARCHAR", + "settle_at": "TIMESTAMP", + "settlement_delay_reason": "INTEGER", + "f1": "text[]", + "f2": "bigint[]", + "f3": "int[]", + "f4": "varchar[]", + "f5": "jsonb", + "f6": "jsonb", + "f7": "jsonb", + "f8": "smallint", + "f9": "date[]", + "f10": "timestamp with time zone[]", + "f11": "timestamp without time zone[]", + "f12": "boolean[]", + "f13": "smallint[]", + "my_date": "DATE", + "old_date": "DATE", + "my_time": "TIME", + "my_mood": "mood", + "myh": "HSTORE", + "geometryPoint": "geometry(point)", + "geography_point": "geography(point)", + "geometry_linestring": "geometry(linestring)", + "geography_linestring": "geography(linestring)", + "geometry_polygon": "geometry(polygon)", + "geography_polygon": "geography(polygon)", + "nannu": "NUMERIC", + "myreal": "REAL", + "myreal2": "REAL", + "myreal3": "REAL", + } + + // Prepare the fields string + fields := make([]string, 0, len(fieldNames)) + for _, fieldName := range fieldNames { + fieldType, ok := fieldTypes[fieldName] + if !ok { + return fmt.Errorf("field type for %s not found", fieldName) + } + fields = append(fields, fmt.Sprintf("%s %s", fieldName, fieldType)) + } + + tblFieldStr := strings.Join(fields, ", ") + + var pgErr *pgconn.PgError + _, enumErr := conn.Exec(context.Background(), createMoodEnum) + if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(pgErr) { + return enumErr + } + + _, err := conn.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE e2e_test_%s.%s ( + %s + );`, + suffix, tableName, tblFieldStr)) + if err != nil { + return err + } + + return nil +} + +func PopulateSourceTableNew(conn *pgx.Conn, suffix string, tableName string, rowCount int, fieldNames []string) error { + var ids []string + var rows []string + + // Function to generate a random UUID + generateUUID := func() string { + return uuid.New().String() + } + + // Map of hardcoded values for each field + hardcodedValues := map[string]string{ + "from": "CURRENT_TIMESTAMP", + "created_at": "CURRENT_TIMESTAMP", + "updated_at": "CURRENT_TIMESTAMP", + "transaction_hash": "E'\\\\xDEADBEEF'", + "ownerable_type": "'type1'", + "deal_type": "'dealType1'", + "ignore_price": "false", + "card_bought_notified": "false", + "status": "1", + "settlement_delay_reason": "1", + "f1": "'{text1, text2}'", + "f2": "'{123, 456}'", + "f3": "'{789, 012}'", + "f4": "'{varchar1, varchar2}'", + "f5": `'"key": "value"'`, + "f6": `'[{"key1": "value1", "key2": "value2", "key3": "value3"}]'`, + "f7": `'{"key": "value"}'`, + "f8": "'15'", + "f9": "'{2023-09-09,2029-08-10}'", + "f10": `'{"2024-01-15 17:00:00+00","2024-01-16 14:30:00+00"}'`, + "f11": `'{"2026-01-17 10:00:00","2026-01-18 13:45:00"}'`, + "f12": "'{true, false}'", + "f13": "'{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'", + "my_date": "CURRENT_DATE", + "my_time": "CURRENT_TIME", + "my_mood": "'happy'", + "myh": `'\"a\"=>\"b\"'`, + "geometryPoint": "'POINT(1 2)'", + "geography_point": "'POINT(40.7128 -74.0060)'", + "geometry_linestring": "'LINESTRING(0 0, 1 1, 2 2)'", + "geography_linestring": "'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)'", + "geometry_polygon": "'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'", + "geography_polygon": "'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'", + "nannu": "'NaN'", + "myreal": "'3.14159'", + "myreal2": "'1'", + "myreal3": "'1.0'", + } + + // Prepare the rows + for i := 0; i < rowCount-1; i++ { + id := generateUUID() + ids = append(ids, id) + + var fieldValues []string + for _, fieldName := range fieldNames { + switch fieldName { + case "id": + fieldValues = append(fieldValues, fmt.Sprintf("'%s'", id)) + case "card_id", "ownerable_id", "deal_id", "ethereum_transaction_id", "transaction_id", "asset_id", "account_id": + fieldValues = append(fieldValues, fmt.Sprintf("'%s'", generateUUID())) + default: + hardcodedValue, ok := hardcodedValues[fieldName] + if !ok { + return fmt.Errorf("hardcoded value for field %s not found", fieldName) + } + fieldValues = append(fieldValues, hardcodedValue) + } + } + + rows = append(rows, fmt.Sprintf("(%s)", strings.Join(fieldValues, ", "))) + } + + // Insert data into the table + _, err := conn.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO e2e_test_%s.%s (%s) + VALUES %s; + `, suffix, tableName, strings.Join(fieldNames, ", "), strings.Join(rows, ", "))) + + if err != nil { + return err + } + + // Add a row where all the nullable fields are null + // _, err = conn.Exec(context.Background(), fmt.Sprintf(` + // INSERT INTO e2e_test_%s.%s ( + // id, from, updated_at, + // transfer_type, blockchain, card_bought_notified + // ) VALUES ( + // '%s',CURRENT_TIMESTAMP,CURRENT_TIMESTAMP, + // 0, 1, false + // ); + // `, suffix, tableName, generateUUID())) + + if err != nil { + return err + } + + return nil +} + +func GetOwnersSchemaNew(fieldNames []string) *model.QRecordSchema { + var fields []model.QField + + // Function to check if a field is present in the list + isIncluded := func(fieldName string) bool { + for _, name := range fieldNames { + if name == fieldName { + return true + } + } + return false + } + + // Append fields based on the provided list + appendField := func(field model.QField) { + if len(fieldNames) == 0 || isIncluded(field.Name) { + fields = append(fields, field) + } + } + + // Define all fields + allFields := []model.QField{ + {Name: "id", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "card_id", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "from", Type: qvalue.QValueKindTimestamp, Nullable: true}, + {Name: "price", Type: qvalue.QValueKindNumeric, Nullable: true}, + {Name: "created_at", Type: qvalue.QValueKindTimestamp, Nullable: true}, + {Name: "updated_at", Type: qvalue.QValueKindTimestamp, Nullable: true}, + {Name: "transaction_hash", Type: qvalue.QValueKindBytes, Nullable: true}, + {Name: "ownerable_type", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "ownerable_id", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "user_nonce", Type: qvalue.QValueKindInt64, Nullable: true}, + {Name: "transfer_type", Type: qvalue.QValueKindInt64, Nullable: true}, + {Name: "blockchain", Type: qvalue.QValueKindInt64, Nullable: true}, + {Name: "deal_type", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "deal_id", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "ethereum_transaction_id", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "ignore_price", Type: qvalue.QValueKindBoolean, Nullable: true}, + {Name: "card_eth_value", Type: qvalue.QValueKindFloat64, Nullable: true}, + {Name: "paid_eth_price", Type: qvalue.QValueKindFloat64, Nullable: true}, + {Name: "card_bought_notified", Type: qvalue.QValueKindBoolean, Nullable: true}, + {Name: "address", Type: qvalue.QValueKindNumeric, Nullable: true}, + {Name: "account_id", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "asset_id", Type: qvalue.QValueKindNumeric, Nullable: true}, + {Name: "status", Type: qvalue.QValueKindInt64, Nullable: true}, + {Name: "transaction_id", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "settled_at", Type: qvalue.QValueKindTimestamp, Nullable: true}, + {Name: "reference_id", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "settle_at", Type: qvalue.QValueKindTimestamp, Nullable: true}, + {Name: "settlement_delay_reason", Type: qvalue.QValueKindInt64, Nullable: true}, + {Name: "f1", Type: qvalue.QValueKindArrayString, Nullable: true}, + {Name: "f2", Type: qvalue.QValueKindArrayInt64, Nullable: true}, + {Name: "f3", Type: qvalue.QValueKindArrayInt32, Nullable: true}, + {Name: "f4", Type: qvalue.QValueKindArrayString, Nullable: true}, + {Name: "f5", Type: qvalue.QValueKindJSON, Nullable: true}, + {Name: "f6", Type: qvalue.QValueKindJSON, Nullable: true}, + {Name: "f7", Type: qvalue.QValueKindJSON, Nullable: true}, + {Name: "f8", Type: qvalue.QValueKindInt16, Nullable: true}, + // {Name: "f9", Type: qvalue.QValueKindArrayDate, Nullable: true}, + // {Name: "f10", Type: qvalue.QValueKindArrayTimestampTz, Nullable: true}, + // {Name: "f11", Type: qvalue.QValueKindArrayTimestamp, Nullable: true}, + // {Name: "f12", Type: qvalue.QValueKindArrayBoolean, Nullable: true}, + {Name: "f13", Type: qvalue.QValueKindArrayInt16, Nullable: true}, + {Name: "my_date", Type: qvalue.QValueKindDate, Nullable: true}, + {Name: "old_date", Type: qvalue.QValueKindDate, Nullable: true}, + {Name: "my_time", Type: qvalue.QValueKindTime, Nullable: true}, + {Name: "my_mood", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "geometryPoint", Type: qvalue.QValueKindGeometry, Nullable: true}, + {Name: "geometry_linestring", Type: qvalue.QValueKindGeometry, Nullable: true}, + {Name: "geometry_polygon", Type: qvalue.QValueKindGeometry, Nullable: true}, + {Name: "geography_point", Type: qvalue.QValueKindGeography, Nullable: true}, + {Name: "geography_linestring", Type: qvalue.QValueKindGeography, Nullable: true}, + {Name: "geography_polygon", Type: qvalue.QValueKindGeography, Nullable: true}, + {Name: "nannu", Type: qvalue.QValueKindFloat64, Nullable: true}, + {Name: "myreal", Type: qvalue.QValueKindFloat32, Nullable: true}, + {Name: "myreal2", Type: qvalue.QValueKindFloat32, Nullable: true}, + {Name: "myreal3", Type: qvalue.QValueKindFloat32, Nullable: true}, + } + + // Append selected fields to the final schema + for _, field := range allFields { + appendField(field) + } + + return &model.QRecordSchema{Fields: fields} +} + +func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error { + createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" tblFields := []string{ "id UUID NOT NULL PRIMARY KEY", "card_id UUID", @@ -277,16 +558,89 @@ func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error { return enumErr } _, err := conn.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE e2e_test_%s.%s ( - %s - );`, suffix, tableName, tblFieldStr)) + CREATE TABLE e2e_test_%s.%s ( + %s + );`, suffix, tableName, tblFieldStr)) if err != nil { return err } - return nil } +// func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error { +// createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" + +// tblFields := []string{ +// "id UUID NOT NULL PRIMARY KEY", +// "card_id UUID", +// `"from" TIMESTAMP NOT NULL`, +// "price NUMERIC", +// "created_at TIMESTAMP NOT NULL", +// "updated_at TIMESTAMP NOT NULL", +// "transaction_hash BYTEA", +// "ownerable_type VARCHAR", +// "ownerable_id UUID", +// "user_nonce INTEGER", +// "transfer_type INTEGER DEFAULT 0 NOT NULL", +// "blockchain INTEGER NOT NULL", +// "deal_type VARCHAR", +// "deal_id UUID", +// "ethereum_transaction_id UUID", +// "ignore_price BOOLEAN DEFAULT false", +// "card_eth_value DOUBLE PRECISION", +// "paid_eth_price DOUBLE PRECISION", +// "card_bought_notified BOOLEAN DEFAULT false NOT NULL", +// "address NUMERIC(20,8)", +// "account_id UUID", +// "asset_id NUMERIC NOT NULL", +// "status INTEGER", +// "transaction_id UUID", +// "settled_at TIMESTAMP", +// "reference_id VARCHAR", +// "settle_at TIMESTAMP", +// "settlement_delay_reason INTEGER", +// "f1 text[]", +// "f2 bigint[]", +// "f3 int[]", +// "f4 varchar[]", +// "f5 jsonb", +// "f6 jsonb", +// "f7 jsonb", +// "f8 smallint", +// "f9 date[]", +// "f10 timestamp with time zone[]", +// "f11 timestamp without time zone[]", +// "f12 boolean[]", +// "f13 smallint[]", +// "my_date DATE", +// "old_date DATE", +// "my_time TIME", +// "my_mood mood", +// "myh HSTORE", +// `"geometryPoint" geometry(point)`, +// "geography_point geography(point)", +// "geometry_linestring geometry(linestring)", +// "geography_linestring geography(linestring)", +// "geometry_polygon geometry(polygon)", +// "geography_polygon geography(polygon)", +// } +// tblFieldStr := strings.Join(tblFields, ",") +// var pgErr *pgconn.PgError +// _, enumErr := conn.Exec(context.Background(), createMoodEnum) +// if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(pgErr) { +// return enumErr +// } +// _, err := conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE e2e_test_%s.%s ( +// %s +// );`, suffix, tableName, tblFieldStr)) +// if err != nil { +// return err +// } + +// return nil +// } + func generate20MBJson() ([]byte, error) { xn := make(map[string]interface{}, 215000) for i := 0; i < 215000; i++ { @@ -336,7 +690,6 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String()) rows = append(rows, row) } - _, err := conn.Exec(context.Background(), fmt.Sprintf(` INSERT INTO e2e_test_%s.%s ( id, card_id, "from", price, created_at, @@ -359,42 +712,130 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou } // add a row where all the nullable fields are null - _, err = conn.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO e2e_test_%s.%s ( - id, "from", created_at, updated_at, - transfer_type, blockchain, card_bought_notified, asset_id - ) VALUES ( - '%s', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, - 0, 1, false, 12345 - ); - `, suffix, tableName, uuid.New().String())) - if err != nil { - return err - } - + // _, err = conn.Exec(context.Background(), fmt.Sprintf(` + // INSERT INTO e2e_test_%s.%s ( + // id, from, updated_at, + // transfer_type, blockchain, card_bought_notified + // ) VALUES ( + // '%s',CURRENT_TIMESTAMP,CURRENT_TIMESTAMP, + // 0, 1, false + // ); + // `, suffix, tableName, uuid.New().String())) + // if err != nil { + // return err + // } // generate a 20 MB json and update id[0]'s col f5 to it - v, err := generate20MBJson() - if err != nil { - return err - } - _, err = conn.Exec(context.Background(), fmt.Sprintf(` - UPDATE e2e_test_%s.%s SET f5 = $1 WHERE id = $2; - `, suffix, tableName), v, ids[0]) - if err != nil { - return err - } + // starts her ************* + // v, err := generate20MBJson() + // if err != nil { + + // return err + // } + + // _, err = conn.Exec(context.Background(), fmt.Sprintf(` + // UPDATE e2e_test_%s.%s SET f8 = 5 WHERE id = $2; + // `, suffix, tableName), v, ids[0]) + // if err != nil { + // return err + // } + fmt.Printf("*\n idhar") // update my_date to a date before 1970 _, err = conn.Exec(context.Background(), fmt.Sprintf(` - UPDATE e2e_test_%s.%s SET old_date = '1950-01-01' WHERE id = $1; - `, suffix, tableName), ids[0]) + UPDATE e2e_test_%s.%s SET ownerable_type = 'Hello' WHERE id = $1; + `, suffix, tableName), ids[0]) if err != nil { return err } - return nil } +// func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCount int) error { +// var ids []string +// var rows []string +// for i := 0; i < rowCount-1; i++ { +// id := uuid.New().String() +// ids = append(ids, id) +// row := fmt.Sprintf(` +// ( +// '%s', '%s', CURRENT_TIMESTAMP, 3.86487206688919, CURRENT_TIMESTAMP, +// CURRENT_TIMESTAMP, E'\\\\xDEADBEEF', 'type1', '%s', +// 1, 0, 1, 'dealType1', +// '%s', '%s', false, 1.2345, +// 1.2345, false, 200.12345678, '%s', +// 200, 1, '%s', CURRENT_TIMESTAMP, 'refID', +// CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012], +// ARRAY['varchar1', 'varchar2'], '{"key": -8.02139037433155}', +// '[{"key1": "value1", "key2": "value2", "key3": "value3"}]', +// '{"key": "value"}', 15,'{2023-09-09,2029-08-10}', +// '{"2024-01-15 17:00:00+00","2024-01-16 14:30:00+00"}', +// '{"2026-01-17 10:00:00","2026-01-18 13:45:00"}', +// '{true, false}', +// '{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}', +// CURRENT_DATE, CURRENT_TIME,'happy', '"a"=>"b"','POINT(1 2)','POINT(40.7128 -74.0060)', +// 'LINESTRING(0 0, 1 1, 2 2)', +// 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)', +// 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))' +// )`, +// id, uuid.New().String(), uuid.New().String(), +// uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String()) +// rows = append(rows, row) +// } + +// _, err := conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO e2e_test_%s.%s ( +// id, card_id, "from", price, created_at, +// updated_at, transaction_hash, ownerable_type, ownerable_id, +// user_nonce, transfer_type, blockchain, deal_type, +// deal_id, ethereum_transaction_id, ignore_price, card_eth_value, +// paid_eth_price, card_bought_notified, address, account_id, +// asset_id, status, transaction_id, settled_at, reference_id, +// settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13, my_date, +// my_time, my_mood, myh, +// "geometryPoint", geography_point,geometry_linestring, geography_linestring,geometry_polygon, geography_polygon +// ) VALUES %s; +// `, suffix, tableName, strings.Join(rows, ","))) +// if err != nil { +// return err +// } + +// // add a row where all the nullable fields are null +// _, err = conn.Exec(context.Background(), fmt.Sprintf(` +// INSERT INTO e2e_test_%s.%s ( +// id, "from", created_at, updated_at, +// transfer_type, blockchain, card_bought_notified, asset_id +// ) VALUES ( +// '%s', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, +// 0, 1, false, 12345 +// ); +// `, suffix, tableName, uuid.New().String())) +// if err != nil { +// return err +// } + +// // generate a 20 MB json and update id[0]'s col f5 to it +// v, err := generate20MBJson() +// if err != nil { +// return err +// } +// _, err = conn.Exec(context.Background(), fmt.Sprintf(` +// UPDATE e2e_test_%s.%s SET f5 = $1 WHERE id = $2; +// `, suffix, tableName), v, ids[0]) +// if err != nil { +// return err +// } + +// // update my_date to a date before 1970 +// _, err = conn.Exec(context.Background(), fmt.Sprintf(` +// UPDATE e2e_test_%s.%s SET old_date = '1950-01-01' WHERE id = $1; +// `, suffix, tableName), ids[0]) +// if err != nil { +// return err +// } + +// return nil +// } + func CreateQRepWorkflowConfig( flowJobName string, sourceTable string, @@ -515,6 +956,26 @@ func GetOwnersSelectorStringsSF() [2]string { return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")} } +func GetOwnersSelectorStringsCH(fieldNames []string) [2]string { + schema := GetOwnersSchemaNew(fieldNames) + pgFields := make([]string, 0, len(schema.Fields)) + sfFields := make([]string, 0, len(schema.Fields)) + for _, field := range schema.Fields { + pgFields = append(pgFields, fmt.Sprintf(`"%s"`, field.Name)) + if strings.Contains(field.Name, "geo") { + colName := connsnowflake.SnowflakeIdentifierNormalize(field.Name) + + // Have to apply a WKT transformation here, + // else the sql driver we use receives the values as snowflake's OBJECT + // which is troublesome to deal with. Now it receives it as string. + sfFields = append(sfFields, fmt.Sprintf(`ST_ASWKT(%s) as %s`, colName, colName)) + } else { + sfFields = append(sfFields, field.Name) + } + } + return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")} +} + type testWriter struct { *testing.T }