Skip to content

Commit

Permalink
fix after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 28, 2024
1 parent 00828cb commit d83b466
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 25 deletions.
3 changes: 2 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableMappings: input.FlowConnectionConfigs.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
})
res.RelationMessageMapping = input.RelationMessageMapping

Check failure on line 302 in flow/activities/flowable.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
if err != nil {
slog.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to push records: %w", err)
}
res.RelationMessageMapping = input.RelationMessageMapping

err = errGroup.Wait()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
return nil, err
}

tableSchemaDeltas := req.Records.WaitForSchemaDeltas(req.TableMappings)
err = c.ReplayTableSchemaDeltas(req.FlowJobName, tableSchemaDeltas)
//tableSchemaDeltas := req.Records.WaitForSchemaDeltas(req.TableMappings)

Check failure on line 142 in flow/connectors/clickhouse/cdc.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
err = c.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand All @@ -155,8 +155,8 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: <-req.Records.RelationMessageMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
//RelationMessageMapping: <-req.Records.RelationMessageMapping,

Check failure on line 159 in flow/connectors/clickhouse/cdc.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
}, nil
}

Expand Down
37 changes: 17 additions & 20 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,25 @@ func generateCreateTableSQLForNormalizedTable(
}

func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
normBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
if err != nil {
c.logger.ErrorContext(c.ctx, "[clickhouse] error while getting last sync and normalize batch id", err)
return nil, err
}

// normalize has caught up with sync, chill until more records are loaded.
if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
if normBatchID >= req.SyncBatchID {
return &model.NormalizeResponse{
Done: false,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}

destinationTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
req.SyncBatchID,
normBatchID,
)
if err != nil {
c.logger.ErrorContext(c.ctx, "[clickhouse] error while getting distinct table names in batch", err)
Expand Down Expand Up @@ -179,9 +179,9 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques
selectQuery.WriteString(" FROM ")
selectQuery.WriteString(rawTbl)
selectQuery.WriteString(" WHERE _peerdb_batch_id > ")
selectQuery.WriteString(fmt.Sprintf("%d", batchIDs.NormalizeBatchID))
selectQuery.WriteString(fmt.Sprintf("%d", normBatchID))
selectQuery.WriteString(" AND _peerdb_batch_id <= ")
selectQuery.WriteString(fmt.Sprintf("%d", batchIDs.SyncBatchID))
selectQuery.WriteString(fmt.Sprintf("%d", req.SyncBatchID))
selectQuery.WriteString(" AND _peerdb_destination_table_name = '")
selectQuery.WriteString(tbl)
selectQuery.WriteString("'")
Expand All @@ -203,12 +203,12 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques
}
}

endNormalizeBatchId := batchIDs.NormalizeBatchID + 1
endNormalizeBatchId := normBatchID + 1
c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, endNormalizeBatchId)

Check failure on line 207 in flow/connectors/clickhouse/normalize.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `c.pgMetadata.UpdateNormalizeBatchID` is not checked (errcheck)
return &model.NormalizeResponse{
Done: true,
StartBatchID: endNormalizeBatchId,
EndBatchID: batchIDs.SyncBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}

Expand Down Expand Up @@ -246,19 +246,16 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch(
return tableNames, nil
}

func (c *ClickhouseConnector) GetLastSyncAndNormalizeBatchID(flowJobName string) (model.SyncAndNormalizeBatchID, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(flowJobName)
if err != nil {
return model.SyncAndNormalizeBatchID{}, fmt.Errorf("error while getting last sync batch id: %w", err)
}
func (c *ClickhouseConnector) GetLastNormalizeBatchID(flowJobName string) (int64, error) {
// syncBatchID, err := c.pgMetadata.GetLastBatchID(flowJobName)
// if err != nil {
// return 0, fmt.Errorf("error while getting last sync batch id: %w", err)
// }

normalizeBatchID, err := c.pgMetadata.GetLastNormalizeBatchID(flowJobName)
if err != nil {
return model.SyncAndNormalizeBatchID{}, fmt.Errorf("error while getting last normalize batch id: %w", err)
return 0, fmt.Errorf("error while getting last normalize batch id: %w", err)
}

return model.SyncAndNormalizeBatchID{
SyncBatchID: syncBatchID,
NormalizeBatchID: normalizeBatchID,
}, nil
return normalizeBatchID, nil
}

0 comments on commit d83b466

Please sign in to comment.