Skip to content

Commit

Permalink
CI: ClickHouse (#1872)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jul 4, 2024
1 parent dd7785c commit 9944e16
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/actions/genprotos/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ runs:
- if: steps.cache.outputs.cache-hit != 'true'
uses: actions/setup-go@v5
with:
go-version: '1.22.3'
go-version: '1.22.5'
cache: false
- if: steps.cache.outputs.cache-hit != 'true'
uses: bufbuild/[email protected]
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ jobs:
with:
version: "latest"

- name: start clickhouse
uses: getsentry/action-clickhouse-in-ci@v1

- name: Install Temporal CLI
uses: temporalio/setup-temporal@v0

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewClickhouseConnector(
config *protos.ClickhouseConfig,
) (*ClickhouseConnector, error) {
logger := logger.LoggerFromCtx(ctx)
database, err := connect(ctx, config)
database, err := Connect(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func NewClickhouseConnector(
}, nil
}

func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) {
func Connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) {
var tlsSetting *tls.Config
if !config.DisableTls {
tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13}
Expand Down
174 changes: 174 additions & 0 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package e2e_clickhouse

import (
"context"
"fmt"
"strings"
"testing"

"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/clickhouse"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2e/s3"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickHouseSuite struct {
t *testing.T
conn *connpostgres.PostgresConnector
s3Helper *e2e_s3.S3TestHelper
suffix string
}

func (s ClickHouseSuite) T() *testing.T {
return s.t
}

func (s ClickHouseSuite) Connector() *connpostgres.PostgresConnector {
return s.conn
}

func (s ClickHouseSuite) DestinationConnector() connectors.Connector {
// TODO have CH connector
return nil
}

func (s ClickHouseSuite) Conn() *pgx.Conn {
return s.Connector().Conn()
}

func (s ClickHouseSuite) Suffix() string {
return s.suffix
}

func (s ClickHouseSuite) Peer() *protos.Peer {
return s.PeerForDatabase("e2e_test_" + s.suffix)
}

func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer {
ret := &protos.Peer{
Name: e2e.AddSuffix(s, dbname),
Type: protos.DBType_CLICKHOUSE,
Config: &protos.Peer_ClickhouseConfig{
ClickhouseConfig: &protos.ClickhouseConfig{
Host: "localhost",
Port: 9000,
Database: dbname,
S3Path: s.s3Helper.BucketName,
AccessKeyId: *s.s3Helper.S3Config.AccessKeyId,
SecretAccessKey: *s.s3Helper.S3Config.SecretAccessKey,
Region: *s.s3Helper.S3Config.Region,
DisableTls: true,
Endpoint: s.s3Helper.S3Config.Endpoint,
},
},
}
e2e.CreatePeer(s.t, ret)
return ret
}

func (s ClickHouseSuite) DestinationTable(table string) string {
return table
}

func (s ClickHouseSuite) Teardown() {
require.NoError(s.t, s.s3Helper.CleanUp(context.Background()))
e2e.TearDownPostgres(s)
}

func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) {
ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig())
if err != nil {
return nil, err
}

rows, err := ch.Query(
fmt.Sprintf(`SELECT %s FROM e2e_test_%s.%s ORDER BY id`, cols, s.suffix, table),
)
if err != nil {
return nil, err
}

batch := &model.QRecordBatch{}
types, err := rows.ColumnTypes()
if err != nil {
return nil, err
}
row := make([]interface{}, 0, len(types))
for _, ty := range types {
prec, scale, _ := ty.DecimalSize()
nullable, _ := ty.Nullable()
var qkind qvalue.QValueKind
switch ty.DatabaseTypeName() {
case "String":
var val string
row = append(row, &val)
qkind = qvalue.QValueKindString
case "Int32":
var val int32
row = append(row, &val)
qkind = qvalue.QValueKindInt32
default:
return nil, fmt.Errorf("failed to resolve QValueKind for %s", ty.DatabaseTypeName())
}
batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{
Name: ty.Name(),
Type: qkind,
Precision: int16(prec),
Scale: int16(scale),
Nullable: nullable,
})
}

for rows.Next() {
if err := rows.Scan(row...); err != nil {
return nil, err
}
qrow := make([]qvalue.QValue, 0, len(row))
for _, val := range row {
switch v := val.(type) {
case *string:
qrow = append(qrow, qvalue.QValueString{Val: *v})
case *int32:
qrow = append(qrow, qvalue.QValueInt32{Val: *v})
default:
return nil, fmt.Errorf("cannot convert %T to qvalue", v)
}
}
batch.Records = append(batch.Records, qrow)
}

return batch, rows.Err()
}

func SetupSuite(t *testing.T) ClickHouseSuite {
t.Helper()

suffix := "ch_" + strings.ToLower(shared.RandomString(8))
conn, err := e2e.SetupPostgres(t, suffix)
require.NoError(t, err, "failed to setup postgres")

s3Helper, err := e2e_s3.NewS3TestHelper(false)
require.NoError(t, err, "failed to setup S3")

s := ClickHouseSuite{
t: t,
conn: conn,
suffix: suffix,
s3Helper: s3Helper,
}

ch, err := connclickhouse.Connect(context.Background(), s.PeerForDatabase("default").GetClickhouseConfig())
require.NoError(t, err, "failed to connect to clickhouse")
_, err = ch.Exec("CREATE DATABASE e2e_test_" + suffix)
require.NoError(t, err, "failed to create clickhouse database")

return s
}
5 changes: 5 additions & 0 deletions flow/e2e/generic/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/e2e"
e2e_bigquery "github.com/PeerDB-io/peer-flow/e2e/bigquery"
e2e_clickhouse "github.com/PeerDB-io/peer-flow/e2e/clickhouse"
e2e_postgres "github.com/PeerDB-io/peer-flow/e2e/postgres"
e2e_snowflake "github.com/PeerDB-io/peer-flow/e2e/snowflake"
"github.com/PeerDB-io/peer-flow/e2eshared"
Expand All @@ -30,6 +31,10 @@ func TestGenericBQ(t *testing.T) {
e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite))
}

func TestGenericCH(t *testing.T) {
e2eshared.RunSuite(t, SetupGenericSuite(e2e_clickhouse.SetupSuite))
}

type Generic struct {
e2e.GenericSuite
}
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s PeerFlowE2ETestSuiteS3) Peer() *protos.Peer {
Name: e2e.AddSuffix(s, "s3peer"),
Type: protos.DBType_S3,
Config: &protos.Peer_S3Config{
S3Config: s.s3Helper.s3Config,
S3Config: s.s3Helper.S3Config,
},
}
e2e.CreatePeer(s.t, ret)
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() {
"",
"",
)
qrepConfig.StagingPath = s.s3Helper.s3Config.Url
qrepConfig.StagingPath = s.s3Helper.S3Config.Url

env := e2e.RunQRepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
Expand Down Expand Up @@ -172,7 +172,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() {
"",
"",
)
qrepConfig.StagingPath = s.s3Helper.s3Config.Url
qrepConfig.StagingPath = s.s3Helper.S3Config.Url
qrepConfig.NumRowsPerPartition = 2000
qrepConfig.InitialCopyOnly = true
qrepConfig.WatermarkColumn = "ctid"
Expand Down
15 changes: 6 additions & 9 deletions flow/e2e/s3/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

type S3TestHelper struct {
client *s3.Client
s3Config *protos.S3Config
bucketName string
S3Config *protos.S3Config
BucketName string
prefix string
}

Expand Down Expand Up @@ -83,10 +83,9 @@ func (h *S3TestHelper) ListAllFiles(
ctx context.Context,
jobName string,
) ([]s3types.Object, error) {
Bucket := h.bucketName
Prefix := fmt.Sprintf("%s/%s/", h.prefix, jobName)
files, err := h.client.ListObjects(ctx, &s3.ListObjectsInput{
Bucket: &Bucket,
Bucket: &h.BucketName,
Prefix: &Prefix,
})
if err != nil {
Expand All @@ -97,11 +96,9 @@ func (h *S3TestHelper) ListAllFiles(

// Delete all generated objects during the test
func (h *S3TestHelper) CleanUp(ctx context.Context) error {
Bucket := h.bucketName
Prefix := h.prefix
files, err := h.client.ListObjects(ctx, &s3.ListObjectsInput{
Bucket: &Bucket,
Prefix: &Prefix,
Bucket: &h.BucketName,
Prefix: &h.prefix,
})
if err != nil {
return err
Expand All @@ -110,7 +107,7 @@ func (h *S3TestHelper) CleanUp(ctx context.Context) error {
// Delete each object
for _, obj := range files.Contents {
deleteInput := &s3.DeleteObjectInput{
Bucket: &Bucket,
Bucket: &h.BucketName,
Key: obj.Key,
}

Expand Down
2 changes: 1 addition & 1 deletion flow/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peer-flow

go 1.22.4
go 1.22.5

require (
cloud.google.com/go v0.115.0
Expand Down
11 changes: 5 additions & 6 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func SyncFlowWorkflow(
})
}

syncFlowCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
for !stop && ctx.Err() == nil {
var syncDone bool
mustWait := waitSelector != nil
Expand All @@ -93,12 +98,6 @@ func SyncFlowWorkflow(
currentSyncFlowNum += 1
logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum))

syncFlowCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})

var syncFlowFuture workflow.Future
if config.System == protos.TypeSystem_Q {
syncFlowFuture = workflow.ExecuteActivity(syncFlowCtx, flowable.SyncRecords, config, options, sessionID)
Expand Down

0 comments on commit 9944e16

Please sign in to comment.