From 45441f8dce5297cc4b8a2bf448ba5e4a987c18db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Thu, 31 Oct 2024 16:03:59 +0100 Subject: [PATCH] allow primary key duplicates on Clickhouse --- CHANGELOG.md | 1 + db/dialect.go | 1 + db/dialect_clickhouse.go | 4 ++++ db/dialect_postgres.go | 4 ++++ db/ops.go | 2 +- 5 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67e6914..3dc2393 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased * Added support for the Clickhouse `Date` type. +* Removed the check for duplicate primary keys on the Clickhouse dialect. This allows inserting multiple rows with the same primary key. ## v4.3.0 diff --git a/db/dialect.go b/db/dialect.go index 4e3ea5d..6a89e89 100644 --- a/db/dialect.go +++ b/db/dialect.go @@ -26,6 +26,7 @@ type dialect interface { Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error) Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error OnlyInserts() bool + AllowPkDuplicates() bool CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error } diff --git a/db/dialect_clickhouse.go b/db/dialect_clickhouse.go index 43b42ac..0195954 100644 --- a/db/dialect_clickhouse.go +++ b/db/dialect_clickhouse.go @@ -131,6 +131,10 @@ func (d clickhouseDialect) OnlyInserts() bool { return true } +func (d clickhouseDialect) AllowPkDuplicates() bool { + return true +} + func (d clickhouseDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, _database string, readOnly bool) error { user, pass := EscapeIdentifier(username), escapeStringValue(password) diff --git a/db/dialect_postgres.go b/db/dialect_postgres.go index a591b6d..4478ecb 100644 --- a/db/dialect_postgres.go +++ b/db/dialect_postgres.go @@ -246,6 +246,10 @@ func (d postgresDialect) OnlyInserts() bool { return false } +func (d postgresDialect) AllowPkDuplicates() bool { + return false +} + func (d postgresDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error { user, pass, db := EscapeIdentifier(username), password, EscapeIdentifier(database) var q string diff --git a/db/ops.go b/db/ops.go index 2922ef0..b2cdb12 100644 --- a/db/ops.go +++ b/db/ops.go @@ -32,7 +32,7 @@ func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map l.entries.Set(tableName, entry) } - if _, found := entry.Get(uniqueID); found { + if _, found := entry.Get(uniqueID); found && !l.getDialect().AllowPkDuplicates() { return fmt.Errorf("attempting to insert in table %q a primary key %q, that is already scheduled for insertion, insert should only be called once for a given primary key", tableName, primaryKey) }