Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clickhouse cdc e2e #1246

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,4 @@ services:
- flow-api

volumes:
pgdata:
pgdata:
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c)
destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier)
destinationTableSchema, err := c.GetTableSchema(qrepConfig.DestinationTableIdentifier)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewClickhouseClient(ctx context.Context, config *protos.ClickhouseConfig) (
}

genericExecutor := *peersql.NewGenericSQLQueryExecutor(
ctx, database, clickhouseTypeToQValueKindMap, qvalue.QValueKindToSnowflakeTypeMap)
ctx, database, clickhouseTypeToQValueKindMap, qvalue.QValueKindToClickhouseTypeMap)

return &ClickhouseClient{
GenericSQLQueryExecutor: genericExecutor,
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (c *ClickhouseConnector) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {

fmt.Printf("\n *********************............SyncQRepRecords 1")
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
flowLog := slog.Group("sync_metadata",
Expand All @@ -41,7 +43,7 @@ func (c *ClickhouseConnector) SyncQRepRecords(
return 0, nil
}

tblSchema, err := c.getTableSchema(destTable)
tblSchema, err := c.GetTableSchema(destTable)
if err != nil {
return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err)
}
Expand Down Expand Up @@ -76,7 +78,7 @@ func (c *ClickhouseConnector) createMetadataInsertStatement(
return insertMetadataStmt, nil
}

func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) {
func (c *ClickhouseConnector) GetTableSchema(tableName string) ([]*sql.ColumnType, error) {
//nolint:gosec
queryString := fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName)
//nolint:rowserrcheck
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
return 0, err
}

fmt.Printf("\n*********************************............SyncQRepRecords 2 avroFile %+v\n", avroFile)

s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return 0, err
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem
fields := make([]string, 0, len(schema.Fields))
for _, field := range schema.Fields {
dbType, ok := g.qvalueKindToDBType[field.Type]
//fmt.Printf("\n***********fieldType: %s, dbType: %s\n", field.Type, dbType)
if !ok {
return fmt.Errorf("unsupported qvalue type %s", field.Type)
}
Expand All @@ -115,6 +116,9 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", schemaName, tableName, strings.Join(fields, ", "))

if strings.Contains(tableName, "_ch_") {
command += " ENGINE = ReplacingMergeTree() ORDER BY id"
}
_, err := g.db.ExecContext(g.ctx, command)
if err != nil {
return fmt.Errorf("failed to create table: %w", err)
Expand Down
11 changes: 11 additions & 0 deletions flow/e2e/clickhouse/ch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"host": "localhost",
"port": 9000,
"user": "clickhouse",
"password": "clickhouse",
"database": "desti",
"s3_path": "s3://peerdb-test-bucket",
"access_key_id":"",
"secret_access_key": "",
"region": "us-east-2"
}
201 changes: 201 additions & 0 deletions flow/e2e/clickhouse/clickhouse_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package e2e_clickhouse

import (
"context"
"encoding/json"
"fmt"
"math/big"
"os"
"time"

connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickhouseTestHelper struct {
// config is the Clickhouse config.
Config *protos.ClickhouseConfig
// peer struct holder Clickhouse
Peer *protos.Peer
// connection to another database, to manage the test database
adminClient *connclickhouse.ClickhouseClient
// connection to the test database
testClient *connclickhouse.ClickhouseClient
// testSchemaName is the schema to use for testing.
testSchemaName string
// dbName is the database used for testing.
testDatabaseName string
}

func NewClickhouseTestHelper() (*ClickhouseTestHelper, error) {
jsonPath := os.Getenv("TEST_CH_CREDS")
if jsonPath == "" {
return nil, fmt.Errorf("TEST_CH_CREDS env var not set")
}

content, err := e2eshared.ReadFileToBytes(jsonPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}

var config *protos.ClickhouseConfig
err = json.Unmarshal(content, &config)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
}

peer := generateCHPeer(config)
runID, err := shared.RandomUInt64()
if err != nil {
return nil, fmt.Errorf("failed to generate random uint64: %w", err)
}

testDatabaseName := fmt.Sprintf("e2e_test_%d", runID)

adminClient, err := connclickhouse.NewClickhouseClient(context.Background(), config)
if err != nil {
return nil, fmt.Errorf("failed to create Clickhouse client: %w", err)
}
err = adminClient.ExecuteQuery(fmt.Sprintf("CREATE DATABASE %s", testDatabaseName))
if err != nil {
return nil, fmt.Errorf("failed to create Clickhouse test database: %w", err)
}

config.Database = testDatabaseName
testClient, err := connclickhouse.NewClickhouseClient(context.Background(), config)
if err != nil {
return nil, fmt.Errorf("failed to create Clickhouse client: %w", err)
}

return &ClickhouseTestHelper{
Config: config,
Peer: peer,
adminClient: adminClient,
testClient: testClient,
//testSchemaName: "PUBLIC",
testSchemaName: testDatabaseName,
testDatabaseName: testDatabaseName,
}, nil
}

func generateCHPeer(clickhouseConfig *protos.ClickhouseConfig) *protos.Peer {
ret := &protos.Peer{}
ret.Name = "test_ch_peer"
ret.Type = protos.DBType_CLICKHOUSE

ret.Config = &protos.Peer_ClickhouseConfig{
ClickhouseConfig: clickhouseConfig,
}

return ret
}

// Cleanup drops the database.
func (s *ClickhouseTestHelper) Cleanup() error {
err := s.testClient.Close()
if err != nil {
return err
}
err = s.adminClient.ExecuteQuery(fmt.Sprintf("DROP DATABASE %s", s.testDatabaseName))
if err != nil {
return err
}
return s.adminClient.Close()
}

// RunCommand runs the given command.
func (s *ClickhouseTestHelper) RunCommand(command string) error {
return s.testClient.ExecuteQuery(command)
}

// CountRows(tableName) returns the number of rows in the given table.
func (s *ClickhouseTestHelper) CountRows(tableName string) (int, error) {
res, err := s.testClient.CountRows(s.testSchemaName, tableName)
if err != nil {
return 0, err
}

return int(res), nil
}

// CountRows(tableName) returns the non-null number of rows in the given table.
func (s *ClickhouseTestHelper) CountNonNullRows(tableName string, columnName string) (int, error) {
res, err := s.testClient.CountNonNullRows(s.testSchemaName, tableName, columnName)
if err != nil {
return 0, err
}

return int(res), nil
}

func (s *ClickhouseTestHelper) CheckNull(tableName string, colNames []string) (bool, error) {
return s.testClient.CheckNull(s.testSchemaName, tableName, colNames)
}

func (s *ClickhouseTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecordBatch, error) {
return s.testClient.ExecuteAndProcessQuery(query)
}

func (s *ClickhouseTestHelper) CreateTable(tableName string, schema *model.QRecordSchema) error {
return s.testClient.CreateTable(schema, s.testSchemaName, tableName)
}

// runs a query that returns an int result
func (s *ClickhouseTestHelper) RunIntQuery(query string) (int, error) {
rows, err := s.testClient.ExecuteAndProcessQuery(query)
if err != nil {
return 0, err
}

numRecords := 0
if rows == nil || len(rows.Records) != 1 {
if rows != nil {
numRecords = len(rows.Records)
}
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 rows", query, numRecords)
}

rec := rows.Records[0]
if len(rec) != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, len(rec))
}

switch rec[0].Kind {
case qvalue.QValueKindInt32:
return int(rec[0].Value.(int32)), nil
case qvalue.QValueKindInt64:
return int(rec[0].Value.(int64)), nil
case qvalue.QValueKindNumeric:
// get big.Rat and convert to int
rat := rec[0].Value.(*big.Rat)
return int(rat.Num().Int64() / rat.Denom().Int64()), nil
default:
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec[0].Kind)
}
}

// runs a query that returns an int result
func (s *ClickhouseTestHelper) checkSyncedAt(query string) error {
recordBatch, err := s.testClient.ExecuteAndProcessQuery(query)
if err != nil {
return err
}

for _, record := range recordBatch.Records {
for _, entry := range record {
if entry.Kind != qvalue.QValueKindTimestamp {
return fmt.Errorf("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp")
}
_, ok := entry.Value.(time.Time)
if !ok {
return fmt.Errorf("synced_at column failed: _PEERDB_SYNCED_AT is not valid")
}
}
}

return nil
}
Loading
Loading