Skip to content

Commit

Permalink
fix: handle virtual primary keys in slow path of LOAD DATA & fall bac…
Browse files Browse the repository at this point in the history
…k MySQL JSON loading temporarily
  • Loading branch information
fanyang01 committed Dec 29, 2024
1 parent 062442f commit e017172
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 12 deletions.
19 changes: 17 additions & 2 deletions backend/loaddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/plan"
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/vitess/go/vt/proto/query"
)

const isUnixSystem = runtime.GOOS == "linux" ||
Expand All @@ -29,7 +30,12 @@ func isRewritableLoadData(node *plan.LoadData) bool {
isSupportedLineTerminator(node.LinesTerminatedBy) &&
areAllExpressionsNil(node.SetExprs) &&
areAllExpressionsNil(node.UserVars) &&
isSupportedFileCharacterSet(node.Charset)
isSupportedFileCharacterSet(node.Charset) &&
// JSON columns often contain espcaped characters,
// which cannot be handled by DuckDB perfectly until DuckDB 1.2.0.
// https://github.com/duckdb/duckdb/pull/14464
// TODO(fan): Remove this restriction after DuckDB 1.2.0 is released.
!containsJSONColumn(node.DestSch)
}

func areAllExpressionsNil(exprs []sql.Expression) bool {
Expand All @@ -52,6 +58,15 @@ func isSupportedLineTerminator(terminator string) bool {
return terminator == "\n" || terminator == "\r" || terminator == "\r\n"
}

func containsJSONColumn(schema sql.Schema) bool {
for _, col := range schema {
if col.Type.Type() == query.Type_JSON {
return true
}
}
return false
}

// buildLoadData translates a MySQL LOAD DATA statement
// into a DuckDB INSERT INTO statement and executes it.
func (db *DuckBuilder) buildLoadData(ctx *sql.Context, root sql.Node, insert *plan.InsertInto, dst sql.InsertableTable, load *plan.LoadData) (sql.RowIter, error) {
Expand Down Expand Up @@ -123,7 +138,7 @@ func (db *DuckBuilder) executeLoadData(ctx *sql.Context, insert *plan.InsertInto
// Replicated tables do not have physical primary keys.
// Their logical primary keys are fake and should not be used in INSERT INTO statements.
// https://github.com/apecloud/myduckserver/issues/272
keyless = t.ExtraTableInfo().Replicated
keyless = t.ExtraTableInfo().Replicated || !t.HasPrimaryKey()
}
}

Expand Down
7 changes: 4 additions & 3 deletions catalog/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table
}

func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error) {
rows, err := adapter.QueryCatalog(ctx, "SELECT DISTINCT table_name, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
rows, err := adapter.QueryCatalog(ctx, "SELECT table_name, has_primary_key, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
if err != nil {
return nil, ErrDuckDB.New(err)
}
Expand All @@ -104,11 +104,12 @@ func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error
var tbls []*Table
for rows.Next() {
var tblName string
var hasPrimaryKey bool
var comment stdsql.NullString
if err := rows.Scan(&tblName, &comment); err != nil {
if err := rows.Scan(&tblName, &hasPrimaryKey, &comment); err != nil {
return nil, ErrDuckDB.New(err)
}
t := NewTable(tblName, d).withComment(DecodeComment[ExtraTableInfo](comment.String))
t := NewTable(d, tblName, hasPrimaryKey).withComment(DecodeComment[ExtraTableInfo](comment.String))
tbls = append(tbls, t)
}
if err := rows.Err(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion catalog/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type rowInserter struct {
db string
table string
schema sql.Schema
hasPK bool
replace bool

once sync.Once
Expand Down Expand Up @@ -69,7 +70,7 @@ func (ri *rowInserter) init(ctx *sql.Context) {

insert.Reset()
insert.WriteString("INSERT ")
if ri.replace {
if ri.replace && ri.hasPK {
insert.WriteString(" OR REPLACE")
}
insert.WriteString(" INTO ")
Expand Down
24 changes: 18 additions & 6 deletions catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ import (
)

type Table struct {
mu *sync.RWMutex
name string
mu sync.RWMutex
db *Database
comment *Comment[ExtraTableInfo] // save the comment to avoid querying duckdb everytime
name string
comment *Comment[ExtraTableInfo] // save the comment to avoid querying duckdb every time
schema sql.PrimaryKeySchema

// Whether the table has a physical primary key.
hasPrimaryKey bool
}

type ExtraTableInfo struct {
Expand Down Expand Up @@ -59,11 +62,12 @@ var _ sql.AutoIncrementTable = (*Table)(nil)
var _ sql.CheckTable = (*Table)(nil)
var _ sql.CheckAlterableTable = (*Table)(nil)

func NewTable(name string, db *Database) *Table {
func NewTable(db *Database, name string, hasPrimaryKey bool) *Table {
return &Table{
mu: &sync.RWMutex{},
name: name,
db: db,
name: name,

hasPrimaryKey: hasPrimaryKey,
}
}

Expand Down Expand Up @@ -96,6 +100,10 @@ func (t *Table) ExtraTableInfo() ExtraTableInfo {
return t.comment.Meta
}

func (t *Table) HasPrimaryKey() bool {
return t.hasPrimaryKey
}

// Collation implements sql.Table.
func (t *Table) Collation() sql.CollationID {
return sql.Collation_Default
Expand Down Expand Up @@ -293,6 +301,7 @@ func (t *Table) AddColumn(ctx *sql.Context, column *sql.Column, order *sql.Colum
}
// Update the PK ordinals only after the column is successfully added.
if column.PrimaryKey {
t.hasPrimaryKey = true
t.comment.Meta.PkOrdinals = tableInfo.PkOrdinals
}
return t.withSchema(ctx)
Expand Down Expand Up @@ -457,6 +466,7 @@ func (t *Table) ModifyColumn(ctx *sql.Context, columnName string, column *sql.Co

// Update table metadata
if column.PrimaryKey {
t.hasPrimaryKey = true
t.comment.Meta.PkOrdinals = []int{oldColumnIndex}
}
if !oldColumn.AutoIncrement && column.AutoIncrement {
Expand Down Expand Up @@ -510,6 +520,7 @@ func (t *Table) Inserter(*sql.Context) sql.RowInserter {
db: t.db.Name(),
table: t.name,
schema: t.schema.Schema,
hasPK: t.hasPrimaryKey,
}
}

Expand All @@ -535,6 +546,7 @@ func (t *Table) Replacer(*sql.Context) sql.RowReplacer {
db: t.db.Name(),
table: t.name,
schema: t.schema.Schema,
hasPK: t.hasPrimaryKey,
replace: hasKey,
}
}
Expand Down
16 changes: 16 additions & 0 deletions test/bats/mysql/load_json_column.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bats
bats_require_minimum_version 1.5.0

load helper

@test "Load a TSV file that contains an escaped JSON column" {
mysql_exec_stdin <<-EOF
CREATE DATABASE load_json_column;
USE load_json_column;
CREATE TABLE translations (code VARCHAR(100), domain VARCHAR(16), translations JSON);
SET GLOBAL local_infile = 1;
LOAD DATA LOCAL INFILE 'testdata/issue329.tsv' REPLACE INTO TABLE `load_json_column`.`translations` CHARACTER SET 'utf8mb4' FIELDS TERMINATED BY ' ' ESCAPED BY '\\' LINES STARTING BY '' TERMINATED BY '\n' (`code`, `domain`, `translations`);
EOF
run -0 mysql_exec 'SELECT COUNT(*) FROM `load_json_column`.`translations`'
[ "${output}" = "1" ]
}
1 change: 1 addition & 0 deletions testdata/issue329.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
_ORDER_LOADINGDATE_RESTRICTION messages {"de": "Das Ladedatum darf nicht kleiner oder gleich dem eingeschränkten Ladedatensatz im Bildschirm \\"Verschiedenes\\" sein", "en": "Loading date cannot be less than or equal to the restricted loading date set in root data / date limitation screen", "es": "Loading Date cannot be less than or equal to the restricted loading date set in miscelaneous screen", "fr": "Loading Date cannot be less than or equal to the restricted loading date set in miscelaneous screen", "nl": "Laaddatum mag niet kleiner zijn dan of gelijk zijn aan de beperkte laadgegevensset in het scherm stamgegevens"}

0 comments on commit e017172

Please sign in to comment.