Skip to content

Commit

Permalink
OnResult sending empty blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 21, 2024
1 parent f6f1b09 commit 7bdd716
Showing 1 changed file with 6 additions and 10 deletions.
16 changes: 6 additions & 10 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package connclickhouse

import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
Expand All @@ -18,8 +17,8 @@ import (
)

const (
checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;`
dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;`
checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists`
dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s`
)

// getRawTableName returns the raw table name for the given table identifier.
Expand All @@ -36,16 +35,13 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa
{Name: "table_exists", Data: &existsC},
},
OnResult: func(ctx context.Context, block chproto.Block) error {
if block.Rows != 1 {
return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows)
}
if block.Info.Overflows {
return errors.New("[clickhouse] checkIfTableExists: expected 1 row, got block with overflow")
}
return nil
},
}); err != nil {
return false, fmt.Errorf("error while reading result row: %w", err)
return false, fmt.Errorf("[clickhouse] checkIfTableExists: error in query: %w", err)
}
if len(existsC) != 1 {
return false, fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", len(existsC))
}
return existsC[0] != 0, nil
}
Expand Down

0 comments on commit 7bdd716

Please sign in to comment.