From 7bdd716731e676dab9411a4a18a987392ea2fd55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 12 Oct 2024 14:59:53 +0000 Subject: [PATCH] OnResult sending empty blocks --- flow/connectors/clickhouse/cdc.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index d8067f2d58..51ab356bf6 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -2,7 +2,6 @@ package connclickhouse import ( "context" - "errors" "fmt" "log/slog" "strings" @@ -18,8 +17,8 @@ import ( ) const ( - checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;` - dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;` + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists` + dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s` ) // getRawTableName returns the raw table name for the given table identifier. @@ -36,16 +35,13 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa {Name: "table_exists", Data: &existsC}, }, OnResult: func(ctx context.Context, block chproto.Block) error { - if block.Rows != 1 { - return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows) - } - if block.Info.Overflows { - return errors.New("[clickhouse] checkIfTableExists: expected 1 row, got block with overflow") - } return nil }, }); err != nil { - return false, fmt.Errorf("error while reading result row: %w", err) + return false, fmt.Errorf("[clickhouse] checkIfTableExists: error in query: %w", err) + } + if len(existsC) != 1 { + return false, fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", len(existsC)) } return existsC[0] != 0, nil }