From 1945a5af499cc3c9fe13ba8cf4edaf86f04abd4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 12 Oct 2024 14:59:53 +0000 Subject: [PATCH] OnResult sending empty blocks --- flow/connectors/clickhouse/cdc.go | 16 ++++++---------- flow/connectors/core.go | 3 +-- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index d8067f2d58..51ab356bf6 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -2,7 +2,6 @@ package connclickhouse import ( "context" - "errors" "fmt" "log/slog" "strings" @@ -18,8 +17,8 @@ import ( ) const ( - checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;` - dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;` + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists` + dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s` ) // getRawTableName returns the raw table name for the given table identifier. @@ -36,16 +35,13 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa {Name: "table_exists", Data: &existsC}, }, OnResult: func(ctx context.Context, block chproto.Block) error { - if block.Rows != 1 { - return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows) - } - if block.Info.Overflows { - return errors.New("[clickhouse] checkIfTableExists: expected 1 row, got block with overflow") - } return nil }, }); err != nil { - return false, fmt.Errorf("error while reading result row: %w", err) + return false, fmt.Errorf("[clickhouse] checkIfTableExists: error in query: %w", err) + } + if len(existsC) != 1 { + return false, fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", len(existsC)) } return existsC[0] != 0, nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e7dc2bf688..9facc671a8 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -426,8 +426,7 @@ func GetByNameAs[T Connector](ctx context.Context, env map[string]string, catalo } func CloseConnector(ctx context.Context, conn Connector) { - err := conn.Close() - if err != nil { + if err := conn.Close(); err != nil { logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err)) } }