Skip to content

Commit

Permalink
fix: add workarounds for duckdb's limitation on sequences (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 authored Dec 27, 2024
1 parent 0c978f0 commit 0283fe8
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/bats-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
pip3 install "sqlglot[rs]" pyarrow pandas
curl -LJO https://github.com/duckdb/duckdb/releases/download/v1.1.3/duckdb_cli-linux-amd64.zip
curl -LJO https://github.com/duckdb/duckdb/releases/latest/download/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
chmod +x duckdb
sudo mv duckdb /usr/local/bin
Expand Down
104 changes: 104 additions & 0 deletions .github/workflows/mysql-copy-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
name: MySQL Copy Instance Test

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
copy-instance-test:
runs-on: ubuntu-latest
services:
source:
image: mysql:lts
env:
MYSQL_ROOT_PASSWORD: root
ports:
- 13306:3306
options: >-
--health-cmd="mysqladmin ping"
--health-interval=10s
--health-timeout=5s
--health-retries=3
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'

- name: Install system packages
uses: awalsh128/cache-apt-pkgs-action@latest
with:
packages: libnsl2 # required by MySQL Shell
version: 1.1

- name: Install dependencies
run: |
go get .
pip3 install "sqlglot[rs]"
curl -LJO https://dev.mysql.com/get/Downloads/MySQL-Shell/mysql-shell_9.1.0-1debian12_amd64.deb
sudo dpkg -i ./mysql-shell_9.1.0-1debian12_amd64.deb
- name: Setup test data in source MySQL
run: |
mysqlsh -hlocalhost -P13306 -uroot -proot --sql -e "
CREATE DATABASE testdb;
USE testdb;
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100)
);
INSERT INTO users (name) VALUES ('test1'), ('test2'), ('test3');
-- Make a gap in the id sequence
INSERT INTO users VALUES (100, 'test100');
INSERT INTO users (name) VALUES ('test101');
-- A table with non-default starting auto_increment value
CREATE TABLE items (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100)
) AUTO_INCREMENT=1000;
INSERT INTO items (name) VALUES ('item1'), ('item2'), ('item3');
"
- name: Build and start MyDuck Server
run: |
go build -v
./myduckserver &
sleep 5
- name: Run copy-instance test
run: |
# Set local_infile to true to allow loading data from files
mysqlsh -uroot --no-password --sql -e "SET GLOBAL local_infile = 1;"
# Copy the data from source MySQL to MyDuck
mysqlsh -hlocalhost -P13306 -uroot -proot \
-- util copy-instance "mysql://root:@127.0.0.1:3306" \
--users false --ignore-version true
# Verify the data was copied
for table in users items; do
mysqlsh -hlocalhost -P13306 -uroot -proot --sql -e "
SELECT * FROM testdb.$table ORDER BY id;
" | tee source_data_$table.tsv
mysqlsh -uroot --no-password --sql -e "
SELECT * FROM testdb.$table ORDER BY id;
" | tee copied_data_$table.tsv
diff source_data_$table.tsv copied_data_$table.tsv
done
14 changes: 12 additions & 2 deletions catalog/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,29 @@ func (d *Database) GetTableInsensitive(ctx *sql.Context, tblName string) (sql.Ta
func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table, error) {
tables, err := d.findTables(ctx, pattern)
if err != nil {
ctx.GetLogger().WithFields(logrus.Fields{
"catalog": d.catalog,
"database": d.name,
"pattern": pattern,
}).WithError(err).Error("Failed to find tables")
return nil, err
}
for _, t := range tables {
if err := t.withSchema(ctx); err != nil {
ctx.GetLogger().WithFields(logrus.Fields{
"catalog": d.catalog,
"database": d.name,
"pattern": pattern,
"table": t.Name(),
}).WithError(err).Error("Failed to get table schema")
return nil, err
}
}
return tables, nil
}

func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error) {
rows, err := adapter.QueryCatalog(ctx, "SELECT DISTINCT table_name, comment FROM duckdb_tables() where (database_name = ? and schema_name = ? and table_name ILIKE ?) or (database_name = 'temp' and schema_name = 'main' and table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
rows, err := adapter.QueryCatalog(ctx, "SELECT DISTINCT table_name, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
if err != nil {
return nil, ErrDuckDB.New(err)
}
Expand Down Expand Up @@ -113,7 +124,6 @@ func (d *Database) Name() string {
}

func (d *Database) createAllTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string, temporary bool) error {

var columns []string
var columnCommentSQLs []string
var fullTableName string
Expand Down
116 changes: 87 additions & 29 deletions catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ func getPrimaryKeyOrdinals(ctx *sql.Context, catalogName, dbName, tableName stri
return ordinals
}

func getCreateSequence(temporary bool, sequenceName string) (createStmt, fullName string) {
if temporary {
return `CREATE TEMP SEQUENCE "` + sequenceName + `"`, `temp.main."` + sequenceName + `"`
}
fullName = InternalSchemas.SYS.Schema + `."` + sequenceName + `"`
return `CREATE SEQUENCE ` + fullName, fullName
}

// AddColumn implements sql.AlterableTable.
func (t *Table) AddColumn(ctx *sql.Context, column *sql.Column, order *sql.ColumnOrder) error {
t.mu.Lock()
Expand All @@ -215,7 +223,7 @@ func (t *Table) AddColumn(ctx *sql.Context, column *sql.Column, order *sql.Colum
sql := `ALTER TABLE ` + FullTableName(t.db.catalog, t.db.name, t.name) + ` ADD COLUMN ` + QuoteIdentifierANSI(column.Name) + ` ` + typ.name

temporary := t.db.catalog == "temp"
var sequenceName, fullSequenceName string
var sequenceName, fullSequenceName, createSequenceStmt string

if column.Default != nil {
typ.mysql.Default = column.Default.String()
Expand All @@ -233,24 +241,13 @@ func (t *Table) AddColumn(ctx *sql.Context, column *sql.Column, order *sql.Colum
return err
}
sequenceName = SequenceNamePrefix + uuid.String()
if temporary {
fullSequenceName = `temp.main."` + sequenceName + `"`
} else {
fullSequenceName = InternalSchemas.SYS.Schema + `."` + sequenceName + `"`
}
createSequenceStmt, fullSequenceName = getCreateSequence(temporary, sequenceName)
sqls = append(sqls, createSequenceStmt)

defaultExpr := `nextval('` + fullSequenceName + `')`
sql += " DEFAULT " + defaultExpr
}

if column.AutoIncrement {
if temporary {
sqls = append(sqls, `CREATE TEMP SEQUENCE "`+sequenceName+`"`)
} else {
sqls = append(sqls, `CREATE SEQUENCE `+fullSequenceName)
}
}

sqls = append(sqls, sql)

// DuckDB does not support constraints in ALTER TABLE ADD COLUMN statement,
Expand Down Expand Up @@ -387,7 +384,7 @@ func (t *Table) ModifyColumn(ctx *sql.Context, columnName string, column *sql.Co
tableInfoChanged := false

temporary := t.db.catalog == "temp"
var sequenceName, fullSequenceName string
var sequenceName, fullSequenceName, createSequenceStmt string

// Handle AUTO_INCREMENT changes
if !oldColumn.AutoIncrement && column.AutoIncrement {
Expand All @@ -398,17 +395,8 @@ func (t *Table) ModifyColumn(ctx *sql.Context, columnName string, column *sql.Co
return err
}
sequenceName = SequenceNamePrefix + uuid.String()
if temporary {
fullSequenceName = `temp.main."` + sequenceName + `"`
} else {
fullSequenceName = InternalSchemas.SYS.Schema + `."` + sequenceName + `"`
}

if temporary {
sqls = append(sqls, `CREATE TEMP SEQUENCE "`+sequenceName+`"`)
} else {
sqls = append(sqls, `CREATE SEQUENCE `+fullSequenceName)
}
createSequenceStmt, fullSequenceName = getCreateSequence(temporary, sequenceName)
sqls = append(sqls, createSequenceStmt)
sqls = append(sqls, baseSQL+` SET DEFAULT nextval('`+fullSequenceName+`')`)

// Update table comment with sequence info
Expand Down Expand Up @@ -782,7 +770,18 @@ func (t *Table) PeekNextAutoIncrementValue(ctx *sql.Context) (uint64, error) {
var val uint64
err := adapter.QueryRowCatalog(ctx, `SELECT currval('`+t.comment.Meta.Sequence+`') + 1`).Scan(&val)
if err != nil {
return 0, ErrDuckDB.New(err)
// https://duckdb.org/docs/sql/statements/create_sequence.html#selecting-the-current-value
// > Note that the nextval function must have already been called before calling currval,
// > otherwise a Serialization Error (sequence is not yet defined in this session) will be thrown.
if !strings.Contains(err.Error(), "sequence is not yet defined in this session") {
return 0, ErrDuckDB.New(err)
}
// If the sequence has not been used yet, we can get the start value from the sequence.
// See getCreateSequence() for the sequence name format.
err = adapter.QueryRowCatalog(ctx, `SELECT start_value FROM duckdb_sequences() WHERE concat(schema_name, '."', sequence_name, '"') = '`+t.comment.Meta.Sequence+`'`).Scan(&val)
if err != nil {
return 0, ErrDuckDB.New(err)
}
}

return val, nil
Expand Down Expand Up @@ -834,8 +833,67 @@ func (t *Table) AutoIncrementSetter(ctx *sql.Context) sql.AutoIncrementSetter {

// setAutoIncrementValue is a helper function to update the sequence value
func (t *Table) setAutoIncrementValue(ctx *sql.Context, value uint64) error {
_, err := adapter.ExecCatalog(ctx, `CREATE OR REPLACE SEQUENCE `+t.comment.Meta.Sequence+` START WITH `+strconv.FormatUint(value, 10))
return err
// DuckDB does not support setting the sequence value directly,
// so we need to recreate the sequence with the new start value.
//
// _, err := adapter.ExecCatalog(ctx, `CREATE OR REPLACE SEQUENCE `+t.comment.Meta.Sequence+` START WITH `+strconv.FormatUint(value, 10))
//
// However, `CREATE OR REPLACE` leads to a Dependency Error,
// while `ALTER TABLE ... ALTER COLUMN ... DROP DEFAULT` deos not remove the dependency:
// https://github.com/duckdb/duckdb/issues/15399
// So we create a new sequence with the new start value and change the auto_increment column to use the new sequence.

// Find the column with the auto_increment property
var autoIncrementColumn *sql.Column
for _, column := range t.schema.Schema {
if column.AutoIncrement {
autoIncrementColumn = column
break
}
}
if autoIncrementColumn == nil {
return sql.ErrNoAutoIncrementCol
}

// Generate a random sequence name.
uuid, err := uuid.NewRandom()
if err != nil {
return err
}
sequenceName := SequenceNamePrefix + uuid.String()

// Create a new sequence with the new start value
temporary := t.db.catalog == "temp"
createSequenceStmt, fullSequenceName := getCreateSequence(temporary, sequenceName)
_, err = adapter.Exec(ctx, createSequenceStmt+` START WITH `+strconv.FormatUint(value, 10))
if err != nil {
return ErrDuckDB.New(err)
}

// Update the auto_increment column to use the new sequence
alterStmt := `ALTER TABLE ` + FullTableName(t.db.catalog, t.db.name, t.name) +
` ALTER COLUMN ` + QuoteIdentifierANSI(autoIncrementColumn.Name) +
` SET DEFAULT nextval('` + fullSequenceName + `')`
if _, err = adapter.Exec(ctx, alterStmt); err != nil {
return ErrDuckDB.New(err)
}

// Drop the old sequence
// https://github.com/duckdb/duckdb/issues/15399
// if _, err = adapter.Exec(ctx, "DROP SEQUENCE " + t.comment.Meta.Sequence); err != nil {
// return ErrDuckDB.New(err)
// }

// Update the table comment with the new sequence name
tableInfo := t.comment.Meta
tableInfo.Sequence = fullSequenceName
comment := NewCommentWithMeta(t.comment.Text, tableInfo)
if _, err = adapter.Exec(ctx, `COMMENT ON TABLE `+FullTableName(t.db.catalog, t.db.name, t.name)+` IS '`+comment.Encode()+`'`); err != nil {
return ErrDuckDB.New(err)
}

t.comment.Meta.Sequence = fullSequenceName
return t.withSchema(ctx)
}

// autoIncrementSetter implements the AutoIncrementSetter interface
Expand Down
6 changes: 0 additions & 6 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,12 +1164,6 @@ func TestCreateTable(t *testing.T) {
"insert_into_t1_(b)_values_(1),_(2)",
"show_create_table_t1",
"select_*_from_t1_order_by_b",
"table_with_auto_increment_table_option",
"create_table_t1_(i_int)_auto_increment=10;",
"create_table_t2_(i_int_auto_increment_primary_key)_auto_increment=10;",
"show_create_table_t2",
"insert_into_t2_values_(null),_(null),_(null)",
"select_*_from_t2",
}

// Patch auto-generated queries that are known to fail
Expand Down

0 comments on commit 0283fe8

Please sign in to comment.