Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clickhouse cdc #1096

Merged
merged 36 commits into from
Jan 29, 2024
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e112156
initial cdc code
pankaj-peerdb Jan 18, 2024
9a0e5a0
add generation of normalize schema (#1097)
iskakaushik Jan 18, 2024
db0e0d5
use catalog for metadata
pankaj-peerdb Jan 18, 2024
4718337
more changes
pankaj-peerdb Jan 19, 2024
d9cd595
data going in raw table
pankaj-peerdb Jan 22, 2024
f4620ba
fix normalize errors
iskakaushik Jan 22, 2024
f620e37
more normalize code
iskakaushik Jan 22, 2024
0f2db07
basic sketch of normalize
iskakaushik Jan 22, 2024
f3ab034
fix schemaless ness
iskakaushik Jan 22, 2024
9c9a0ff
fix table exists check
iskakaushik Jan 22, 2024
1f8d06f
more fixes
iskakaushik Jan 22, 2024
a697232
sign and version columns
iskakaushik Jan 22, 2024
8124e58
add other col values
iskakaushik Jan 22, 2024
bb8ca22
lweor case
iskakaushik Jan 22, 2024
b0a7265
hardcode staging
pankaj-peerdb Jan 22, 2024
b1b2ab1
fix stagingPath
pankaj-peerdb Jan 25, 2024
3c05b8e
cleanup
pankaj-peerdb Jan 25, 2024
eaaf63d
cleanup
pankaj-peerdb Jan 26, 2024
4abf02c
fix after cleanup
pankaj-peerdb Jan 28, 2024
3b21db0
fix after reabase
pankaj-peerdb Jan 28, 2024
2de2c4e
fix after rebase
pankaj-peerdb Jan 28, 2024
fb598ac
fix s3 url formation issue
pankaj-peerdb Jan 29, 2024
ad208ce
restore docker
pankaj-peerdb Jan 29, 2024
a51dbf3
lint
pankaj-peerdb Jan 29, 2024
80dc51b
remove unused code
pankaj-peerdb Jan 29, 2024
d97f69a
fix build issue
pankaj-peerdb Jan 29, 2024
56ebce6
fix build issue
pankaj-peerdb Jan 29, 2024
553830f
fix build issue
pankaj-peerdb Jan 29, 2024
083a445
fix flow build
pankaj-peerdb Jan 29, 2024
08f822b
fix lint error in normalize.go
pankaj-peerdb Jan 29, 2024
8173def
fix more lint errors
pankaj-peerdb Jan 29, 2024
d6f89fc
prettier on ui
pankaj-peerdb Jan 29, 2024
9b9e57e
fix build after rebase
pankaj-peerdb Jan 29, 2024
b7d63a9
add newline back
pankaj-peerdb Jan 29, 2024
7566de4
ui prettier
pankaj-peerdb Jan 29, 2024
152b8c8
ts config new line
pankaj-peerdb Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
hardcode staging
  • Loading branch information
pankaj-peerdb committed Jan 29, 2024
commit b0a7265709fc6cce3ccb22ca5a065f4e1c2fdbc8
6 changes: 3 additions & 3 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewClickhouseAvroSyncMethod(

func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error {
fmt.Printf("\n************************* in CopyStageToDesti stagingPath: %+v", s.config.StagingPath)
stagingPath := s.config.StagingPath //"s3://avro-clickhouse"
stagingPath := "s3://avro-clickhouse" //s.config.StagingPath
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return err
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
fmt.Printf("\n************************* in SyncQRepRecords 1")
startTime := time.Now()
dstTableName := config.DestinationTableIdentifier
//s.config.StagingPath = "s3://avro-clickhouse"
s.config.StagingPath = "s3://avro-clickhouse"

schema, err := stream.Schema()
if err != nil {
Expand Down Expand Up @@ -228,7 +228,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
partitionID string,
flowJobName string,
) (*avro.AvroFile, error) {
stagingPath := s.config.StagingPath //"s3://avro-clickhouse"
stagingPath := "s3://avro-clickhouse" //s.config.StagingPath //
fmt.Printf("\n****************************************** StagingPath: %+v*****\n", s.config.StagingPath)
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd,
qvalue.QDWHTypeClickhouse)
Expand Down