-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add clickhouse connector #969
Conversation
701e2cd
to
a07b74d
Compare
add |
Tested this locally. Promising. Blockers for landing:
|
} | ||
|
||
func (c *ClickhouseConnector) ConsolidateQRepPartitions(config *protos.QRepConfig) error { | ||
c.logger.Info("Consolidating partitions noop") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
support UPSERT as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore as first target for CDC only needs append.
cleanup cleanup cleanup cleanup cleanup remove metadataSchema cleanup cleanup cleanup
127a009
to
0845521
Compare
flow/cmd/handler.go
Outdated
@@ -663,6 +663,16 @@ func (h *FlowRequestHandler) CreatePeer( | |||
} | |||
s3Config := s3ConfigObject.S3Config | |||
encodedConfig, encodingErr = proto.Marshal(s3Config) | |||
case protos.DBType_CLICKHOUSE: | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this new line
_ "github.com/ClickHouse/clickhouse-go/v2" | ||
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" | ||
|
||
//_ "github.com/ClickHouse/clickhouse-go" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
logger slog.Logger | ||
} | ||
|
||
// creating this to capture array results from clicknhouse. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mentioned array support not being ready, can this be added as a part of array support MR?
return &ClickhouseConnector{ | ||
ctx: ctx, | ||
database: database, | ||
//database: conn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", //&database=%s" | ||
config.Host, config.Port, config.User, config.Password) //, config.Database | ||
|
||
fmt.Println("connecting...", dsn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove printlns from this file, pass a logger to this, see snowflake connector as an example.
} | ||
|
||
// parseTableName parses a table name into schema and table name. | ||
func parseTableName(tableName string) (*tableNameComponents, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see: ParseSchemaTable
method.
flow/connectors/clickhouse/qrep.go
Outdated
|
||
func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) { | ||
//nolint:gosec | ||
queryString := fmt.Sprintf(` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this query can be in one line, reads better
flow/connectors/clickhouse/qrep.go
Outdated
return false, fmt.Errorf("failed to execute query: %w", err) | ||
} | ||
|
||
//return count.Int64 > 0, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this comment
flow/connectors/clickhouse/qrep.go
Outdated
} | ||
|
||
func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emplty new line - remove
"github.com/PeerDB-io/peer-flow/model" | ||
"github.com/PeerDB-io/peer-flow/model/qvalue" | ||
"github.com/PeerDB-io/peer-flow/shared" | ||
_ "github.com/snowflakedb/gosnowflake" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need thios?
No description provided.