Skip to content

Commit

Permalink
fix: handle virtual primary keys in slow path of LOAD DATA & fall bac…
Browse files Browse the repository at this point in the history
…k MySQL JSON loading temporarily (#336)

* fix: handle virtual primary keys in slow path of LOAD DATA & fall back MySQL JSON loading temporarily
* Support database/table filtering in MySQL replication
  • Loading branch information
fanyang01 authored Jan 2, 2025
1 parent 956e1c4 commit 4c8a81d
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 24 deletions.
17 changes: 13 additions & 4 deletions .github/workflows/replication-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,29 @@ jobs:
docker exec source-db dolt sql -q "
CREATE DATABASE test;
CREATE TABLE test.items (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO test.items VALUES (1, 'test1'), (2, 'test2');"
INSERT INTO test.items VALUES (1, 'test1'), (2, 'test2');
CREATE TABLE test.skip (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO test.skip VALUES (1, 'abc'), (2, 'def');"
elif [ "${{ matrix.source }}" = "mariadb" ]; then
docker exec source-db mariadb -uroot -proot test -e "
CREATE TABLE items (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');"
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');
CREATE TABLE skip (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO skip VALUES (1, 'abc'), (2, 'def');"
else
docker exec source-db mysql -uroot -proot test -e "
CREATE TABLE items (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');"
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');
CREATE TABLE skip (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO skip VALUES (1, 'abc'), (2, 'def');"
fi
- name: Start MyDuck Server in replica mode
run: |
if [ "${{ matrix.source }}" = "postgres" ]; then
SOURCE_DSN="postgres://postgres:[email protected]:5432/test"
else
SOURCE_DSN="mysql://root:[email protected]:3306"
SOURCE_DSN="mysql://root:[email protected]:3306/test?skip-tables=skip"
fi
docker run -d --name myduck \
Expand Down Expand Up @@ -203,6 +209,9 @@ jobs:
exit 1
fi
# Print the logs
docker logs myduck
- name: Cleanup
if: always()
run: |
Expand Down
2 changes: 1 addition & 1 deletion backend/loaddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (db *DuckBuilder) executeLoadData(ctx *sql.Context, insert *plan.InsertInto
// Replicated tables do not have physical primary keys.
// Their logical primary keys are fake and should not be used in INSERT INTO statements.
// https://github.com/apecloud/myduckserver/issues/272
keyless = t.ExtraTableInfo().Replicated
keyless = t.ExtraTableInfo().Replicated || !t.HasPrimaryKey()
}
}

Expand Down
32 changes: 32 additions & 0 deletions binlogreplication/binlog_replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,18 @@ func (d *myBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context
func (d *myBinlogReplicaController) SetReplicationFilterOptions(_ *sql.Context, options []binlogreplication.ReplicationOption) error {
for _, option := range options {
switch strings.ToUpper(option.Name) {
case "REPLICATE_DO_DB":
value, err := getOptionValueAsDatabaseNames(option)
if err != nil {
return err
}
d.filters.setDoDatabases(value)
case "REPLICATE_IGNORE_DB":
value, err := getOptionValueAsDatabaseNames(option)
if err != nil {
return err
}
d.filters.setIgnoreDatabases(value)
case "REPLICATE_DO_TABLE":
value, err := getOptionValueAsTableNames(option)
if err != nil {
Expand Down Expand Up @@ -378,6 +390,8 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr
copy.SourceServerUuid = replicaSourceInfo.Uuid
copy.ConnectRetry = replicaSourceInfo.ConnectRetryInterval
copy.SourceRetryCount = replicaSourceInfo.ConnectRetryCount
// copy.ReplicateDoDBs = d.filters.getDoDatabases()
// copy.ReplicateIgnoreDBs = d.filters.getIgnoreDatabases()
copy.ReplicateDoTables = d.filters.getDoTables()
copy.ReplicateIgnoreTables = d.filters.getIgnoreTables()

Expand Down Expand Up @@ -523,6 +537,24 @@ func getOptionValueAsTableNames(option binlogreplication.ReplicationOption) ([]s
"but expected a list of tables", option.Name, option.Value.GetValue())
}

func getOptionValueAsDatabaseNames(option binlogreplication.ReplicationOption) ([]string, error) {
// The value of the option should be a list of database names.
// But since the parser doesn't have a database name list type,
// we reuse the table name list type to represent a list of database names.
ov, ok := option.Value.(binlogreplication.TableNamesReplicationOptionValue)
if ok {
list := ov.GetValueAsTableList()
names := make([]string, len(list))
for i, t := range list {
names[i] = t.Name()
}
return names, nil
}

return nil, fmt.Errorf("unsupported value type for option %q; found %T, "+
"but expected a list of databases", option.Name, option.Value.GetValue())
}

func verifyAllTablesAreQualified(urts []sql.UnresolvedTable) error {
for _, urt := range urts {
if urt.Database().Name() == "" {
Expand Down
89 changes: 85 additions & 4 deletions binlogreplication/binlog_replica_filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (

// filterConfiguration defines the binlog filtering rules applied on the replica.
type filterConfiguration struct {
// doDatabases holds a map of database names that SHOULD be replicated.
doDatabases map[string]struct{}
// ignoreDatabases holds a map of database names that should NOT be replicated.
ignoreDatabases map[string]struct{}
// doTables holds a map of database name to map of table names, indicating tables that SHOULD be replicated.
doTables map[string]map[string]struct{}
// ignoreTables holds a map of database name to map of table names, indicating tables that should NOT be replicated.
Expand All @@ -36,9 +40,39 @@ type filterConfiguration struct {
// newFilterConfiguration creates a new filterConfiguration instance and initializes members.
func newFilterConfiguration() *filterConfiguration {
return &filterConfiguration{
doTables: make(map[string]map[string]struct{}),
ignoreTables: make(map[string]map[string]struct{}),
mu: &sync.Mutex{},
doDatabases: make(map[string]struct{}),
ignoreDatabases: make(map[string]struct{}),
doTables: make(map[string]map[string]struct{}),
ignoreTables: make(map[string]map[string]struct{}),
mu: &sync.Mutex{},
}
}

// setDoDatabases sets the databases that are allowed to replicate. If any DoDatabases were previously configured,
// they are cleared out before the new databases are set.
func (fc *filterConfiguration) setDoDatabases(databases []string) {
fc.mu.Lock()
defer fc.mu.Unlock()

// Setting new replication filters clears out any existing filters
fc.doDatabases = make(map[string]struct{})

for _, db := range databases {
fc.doDatabases[strings.ToLower(db)] = struct{}{}
}
}

// setIgnoreDatabases sets the databases that are NOT allowed to replicate. If any IgnoreDatabases were previously configured,
// they are cleared out before the new databases are set.
func (fc *filterConfiguration) setIgnoreDatabases(databases []string) {
fc.mu.Lock()
defer fc.mu.Unlock()

// Setting new replication filters clears out any existing filters
fc.ignoreDatabases = make(map[string]struct{})

for _, db := range databases {
fc.ignoreDatabases[strings.ToLower(db)] = struct{}{}
}
}

Expand Down Expand Up @@ -96,6 +130,38 @@ func (fc *filterConfiguration) setIgnoreTables(urts []sql.UnresolvedTable) error
return nil
}

// getDoDatabases returns a slice of database names that are configured to be replicated.
func (fc *filterConfiguration) getDoDatabases() []string {
fc.mu.Lock()
defer fc.mu.Unlock()

if len(fc.doDatabases) == 0 {
return nil
}

databases := make([]string, 0, len(fc.doDatabases))
for db := range fc.doDatabases {
databases = append(databases, db)
}
return databases
}

// getIgnoreDatabases returns a slice of database names that are configured to be filtered out of replication.
func (fc *filterConfiguration) getIgnoreDatabases() []string {
fc.mu.Lock()
defer fc.mu.Unlock()

if len(fc.ignoreDatabases) == 0 {
return nil
}

databases := make([]string, 0, len(fc.ignoreDatabases))
for db := range fc.ignoreDatabases {
databases = append(databases, db)
}
return databases
}

// isTableFilteredOut returns true if the table identified by |tableMap| has been filtered out on this replica and
// should not have any updates applied from binlog messages.
func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *mysql.TableMap) bool {
Expand All @@ -109,6 +175,21 @@ func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *my
fc.mu.Lock()
defer fc.mu.Unlock()

// If any filter doDatabase options are specified, then a database MUST be listed in the set
// for it to be replicated. doDatabase options are processed BEFORE ignoreDatabase options.
// https://dev.mysql.com/doc/refman/8.4/en/replication-rules-db-options.html
if len(fc.doDatabases) > 0 {
if _, ok := fc.doDatabases[db]; !ok {
ctx.GetLogger().Tracef("skipping database %s (not in doDatabases)", db)
return true
}
} else if len(fc.ignoreDatabases) > 0 {
if _, ok := fc.ignoreDatabases[db]; ok {
ctx.GetLogger().Tracef("skipping database %s (in ignoreDatabases)", db)
return true
}
}

// If any filter doTable options are specified, then a table MUST be listed in the set
// for it to be replicated. doTables options are processed BEFORE ignoreTables options.
// If a table appears in both doTable and ignoreTables, it is ignored.
Expand Down Expand Up @@ -160,7 +241,7 @@ func convertFilterMapToStringSlice(filterMap map[string]map[string]struct{}) []s

tableNames := make([]string, 0, len(filterMap))
for dbName, tableMap := range filterMap {
for tableName, _ := range tableMap {
for tableName := range tableMap {
tableNames = append(tableNames, fmt.Sprintf("%s.%s", dbName, tableName))
}
}
Expand Down
95 changes: 94 additions & 1 deletion binlogreplication/binlog_replication_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/require"
)

// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
// TestReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
defer teardown(t)
Expand Down Expand Up @@ -189,3 +189,96 @@ func TestBinlogReplicationFilters_errorCases(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")
}

// TestReplicationFilters_ignoreDatabasesOnly tests that the ignoreDatabases replication
// filtering option is correctly applied and honored.
func TestReplicationFilters_ignoreDatabasesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithSystemVars(t, duckReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Ignore replication events for db01. Also tests that the first filter setting is overwritten by
// the second and that db names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(db02);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(DB01);")

// TODO(fan): Not implemented yet
// Assert that status shows replication filters
// status := showReplicaStatus(t)
// require.Equal(t, "db01", status["Replicate_Ignore_DB"])
// require.Equal(t, "", status["Replicate_Do_DB"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE DATABASE db02;")
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db02.t1 VALUES (%d);", i))
}

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)

// Although the database is ignored, it is still created on the replica
// because the DDL statements are not filtered out.

// Verify that no changes from db01 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
require.NoError(t, rows.Close())

// Verify that all changes from db02 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db02.t1;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "11", row["count"])
require.NoError(t, rows.Close())
}

// TestReplicationFilters_doDatabasesOnly tests that the doDatabases replication
// filtering option is correctly applied and honored.
func TestReplicationFilters_doDatabasesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithSystemVars(t, duckReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Do replication events for db01. Also tests that the first filter setting is overwritten by
// the second and that db names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(db02);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(DB01);")

// TODO(fan): Not implemented yet
// Assert that status shows replication filters
// status := showReplicaStatus(t)
// require.Equal(t, "db01", status["Replicate_Do_DB"])
// require.Equal(t, "", status["Replicate_Ignore_DB"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE DATABASE db02;")
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db02.t1 VALUES (%d);", i))
}

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)

// Verify that all changes from db01 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "11", row["count"])
require.NoError(t, rows.Close())

// Verify that no changes from db02 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db02.t1;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
require.NoError(t, rows.Close())
}
7 changes: 4 additions & 3 deletions catalog/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table
}

func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error) {
rows, err := adapter.QueryCatalog(ctx, "SELECT DISTINCT table_name, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
rows, err := adapter.QueryCatalog(ctx, "SELECT table_name, has_primary_key, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
if err != nil {
return nil, ErrDuckDB.New(err)
}
Expand All @@ -104,11 +104,12 @@ func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error
var tbls []*Table
for rows.Next() {
var tblName string
var hasPrimaryKey bool
var comment stdsql.NullString
if err := rows.Scan(&tblName, &comment); err != nil {
if err := rows.Scan(&tblName, &hasPrimaryKey, &comment); err != nil {
return nil, ErrDuckDB.New(err)
}
t := NewTable(tblName, d).withComment(DecodeComment[ExtraTableInfo](comment.String))
t := NewTable(d, tblName, hasPrimaryKey).withComment(DecodeComment[ExtraTableInfo](comment.String))
tbls = append(tbls, t)
}
if err := rows.Err(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion catalog/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type rowInserter struct {
db string
table string
schema sql.Schema
hasPK bool
replace bool

once sync.Once
Expand Down Expand Up @@ -69,7 +70,7 @@ func (ri *rowInserter) init(ctx *sql.Context) {

insert.Reset()
insert.WriteString("INSERT ")
if ri.replace {
if ri.replace && ri.hasPK {
insert.WriteString(" OR REPLACE")
}
insert.WriteString(" INTO ")
Expand Down
Loading

0 comments on commit 4c8a81d

Please sign in to comment.