Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Move ENUM and SET mappings from vplayer to vstreamer #15723

Merged
merged 43 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2200fcc
Move enum and set mappings to vstreamer
mattlord Apr 15, 2024
0fee7d4
Fix unit test and optimize code
mattlord Apr 16, 2024
a6b13c8
Correct and optimize set handling
mattlord Apr 16, 2024
c12a878
Avoid *any* additional allocations in SET handling
mattlord Apr 16, 2024
b663601
More testing and wip
mattlord Apr 16, 2024
a540c3b
Improve error handling
mattlord Apr 16, 2024
8da07ec
Merge remote-tracking branch 'origin/main' into vstream_enum_strings
mattlord Apr 16, 2024
1f10e2a
Add JiT mapping support
mattlord Apr 17, 2024
96d50f5
Update unit test framework
mattlord Apr 17, 2024
f4ea9e5
Properly handle ENUM/SET types when binary collation is used
mattlord Apr 17, 2024
5314195
Minor changes and test improvements after self review
mattlord Apr 18, 2024
8339329
Merge remote-tracking branch 'origin/main' into vstream_enum_strings
mattlord Apr 18, 2024
3d42461
Very minor improvement to dialDedicatedPool's invalidator
mattlord Apr 18, 2024
c178406
Final self review and cleanup
mattlord Apr 18, 2024
e1e3774
Fix evalengine.NullSafeCompare when comparing an ENUM/SET type to a C…
mattlord Apr 18, 2024
1c1140a
Test cumulative fixes for mapping and comparing
mattlord Apr 19, 2024
01d7894
Add ENUM <-> VARCHAR evalengine compare test cases
mattlord Apr 19, 2024
822410d
Minor improvement to e2e test
mattlord Apr 19, 2024
cfec6de
Minor comment improvements
mattlord Apr 19, 2024
2b943bf
Add vstream FieldEvent member to signify values are strings
mattlord Apr 23, 2024
e3aae77
Adjust other tests
mattlord Apr 23, 2024
13fe9b2
Merge remote-tracking branch 'origin/main' into vstream_enum_strings
mattlord Apr 24, 2024
6435392
Properly calculate field length for SET columns in unit tests
mattlord Apr 24, 2024
b8e8132
Update type comment
mattlord Apr 24, 2024
53b4ab4
Revert "Properly calculate field length for SET columns in unit tests"
mattlord Apr 24, 2024
d2bfdae
remove enumToTextMap in onlineddl/vrepl.go since rule.ConvertEnumToTe…
shlomi-noach May 1, 2024
349ee79
Revert "remove enumToTextMap in onlineddl/vrepl.go since rule.Convert…
shlomi-noach May 1, 2024
aa3359a
Address review comments
mattlord May 1, 2024
e4db066
Add NULL handling to test framework
mattlord May 1, 2024
3084a93
Minor changes after self review of recent commits
mattlord May 1, 2024
ccf1729
Test self nits
mattlord May 1, 2024
65c1ed4
Event string is more than the column info
mattlord May 1, 2024
6510631
Some additional minor unit test framework improvements
mattlord May 1, 2024
aa518ea
Remove unnecessary else block
mattlord May 2, 2024
647de32
Improve unit test
mattlord May 2, 2024
f5681ac
Fix some typos on another self-review
mattlord May 2, 2024
0c62783
Another comment improvement
mattlord May 2, 2024
98374d2
Further comment improvement (nitting myself to deth, send halp)
mattlord May 2, 2024
02b6642
Add release notes
mattlord May 8, 2024
0c98605
Tweaking the release notes
mattlord May 8, 2024
edf9fae
More release notes tweaks
mattlord May 8, 2024
0574d2d
Final set of minor release notes tweaks
mattlord May 9, 2024
1584903
Comparison blocks look terrible using json as the language in GitHub …
mattlord May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ limitations under the License.

package vreplication

import (
"fmt"
"strings"
)

// The product, customer, Lead, Lead-1 tables are used to exercise and test most Workflow variants.
// We violate the NO_ZERO_DATES and NO_ZERO_IN_DATE sql_modes that are enabled by default in
// MySQL 5.7+ and MariaDB 10.2+ to ensure that vreplication still works everywhere and the
Expand All @@ -40,9 +45,10 @@ package vreplication
// default collation as it has to work across versions and the 8.0 default does not exist in 5.7.
var (
// All standard user tables should have a primary key and at least one secondary key.
initialProductSchema = `
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
initialProductSchema = fmt.Sprintf(`
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid), key(date1,date2)) CHARSET=utf8mb4;
create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),
create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum(%s), sport set('football','cricket','baseball'),
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(cid,typ), key(name)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
Expand All @@ -51,19 +57,19 @@ create table orders(oid int, cid int, pid int, mname varchar(128), price int, qt
create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table customer2(cid int, name varchar(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid), key(ts)) CHARSET=utf8;
create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table ` + "`Lead`(`Lead-id`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead-id`" + `), key (date1));
create table ` + "`Lead-1`(`Lead`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead`" + `), key (date2));
create table `+"`Lead`(`Lead-id`"+` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (`+"`Lead-id`"+`), key (date1));
create table `+"`Lead-1`(`Lead`"+` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (`+"`Lead`"+`), key (date2));
create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, val varbinary(128), primary key(id), key(val));
create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at), key (dstuff)) CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
create table vdiff_order (order_id varchar(50) collate utf8mb4_unicode_ci not null, primary key (order_id), key (order_id)) charset=utf8mb4 COLLATE=utf8mb4_unicode_ci;
create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id), key (dt1));
create table json_tbl (id int, j1 json, j2 json, j3 json not null, primary key(id));
create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, mp multipoint, mls multilinestring, mpg multipolygon, gc geometrycollection, primary key(id));
create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob, val2 varbinary(20), `+"`bl@b2`"+` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
create table loadtest (id int, name varchar(256), primary key(id), key(name));
create table nopk (name varchar(128), age int unsigned);
`
`, strings.Join(customerTypes, ","))
// These should always be ignored in vreplication
internalSchema = `
create table _1e275eef_3b20_11eb_a38f_04ed332e05c2_20201210204529_gho(id int, val varbinary(128), primary key(id));
Expand Down Expand Up @@ -152,7 +158,7 @@ create table nopk (name varchar(128), age int unsigned);
}
]
},
"customer_type": {
"enterprise_customer": {
"column_vindexes": [
{
"column": "cid",
Expand Down Expand Up @@ -434,13 +440,13 @@ create table nopk (name varchar(128), age int unsigned);

materializeCustomerTypeSpec = `
{
"workflow": "customer_type",
"workflow": "enterprise_customer",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "customer_type",
"source_expression": "select cid, typ from customer",
"create_ddl": "create table if not exists customer_type (cid bigint not null, typ enum('individual','soho','enterprise'), primary key(cid), key(typ))"
"target_table": "enterprise_customer",
"source_expression": "select cid, name, typ from customer where typ = 'enterprise'",
"create_ddl": "create table if not exists enterprise_customer (cid bigint not null, name varchar(128), typ varchar(64), primary key(cid), key(typ))"
}]
}
`
Expand Down
29 changes: 16 additions & 13 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -539,7 +540,7 @@ func testReshardV2Workflow(t *testing.T) {

// Generate customer records in the background for the rest of the test
// in order to confirm that no writes are lost in either the customer
// table or the customer_name and customer_type materializations
// table or the customer_name and enterprise_customer materializations
// against it during the Reshard and all of the traffic switches.
dataGenCtx, dataGenCancel := context.WithCancel(context.Background())
defer dataGenCancel()
Expand All @@ -555,7 +556,9 @@ func testReshardV2Workflow(t *testing.T) {
case <-dataGenCtx.Done():
return
default:
_ = execVtgateQuery(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name) values (%d, 'tempCustomer%d')", id, id))
// Use a random customer type for each record.
_ = execVtgateQuery(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name, typ) values (%d, 'tempCustomer%d', %s)",
id, id, customerTypes[rand.IntN(len(customerTypes))]))
}
time.Sleep(1 * time.Millisecond)
id++
Expand Down Expand Up @@ -591,17 +594,17 @@ func testReshardV2Workflow(t *testing.T) {
cnres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_name")
require.Len(t, cnres.Rows, 1)
require.EqualValues(t, cres.Rows, cnres.Rows)
waitForNoWorkflowLag(t, vc, "customer", "customer_type")
ctres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_type")
require.Len(t, ctres.Rows, 1)
require.EqualValues(t, cres.Rows, ctres.Rows)
if debugMode {
t.Logf("Done inserting customer data. Record counts in customer: %s, customer_name: %s, customer_type: %s",
cres.Rows[0][0].ToString(), cnres.Rows[0][0].ToString(), ctres.Rows[0][0].ToString())
// We expect the row count to differ in enterpise_customer because it is
// using a `where typ='enterprise'` filter. So the count is only for debug
// info.
ecres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from enterprise_customer")
t.Logf("Done inserting customer data. Record counts in customer: %s, customer_name: %s, enterprise_customer: %s",
cres.Rows[0][0].ToString(), cnres.Rows[0][0].ToString(), ecres.Rows[0][0].ToString())
}
// We also do a vdiff on the materialize workflows for good measure.
doVtctldclientVDiff(t, "customer", "customer_name", "", nil)
doVtctldclientVDiff(t, "customer", "customer_type", "", nil)
doVtctldclientVDiff(t, "customer", "enterprise_customer", "", nil)
}

func testMoveTablesV2Workflow(t *testing.T) {
Expand Down Expand Up @@ -669,7 +672,7 @@ func testMoveTablesV2Workflow(t *testing.T) {

output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1"))
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "enterprise_customer") && !listOutputContainsWorkflow(output, "wf1"))

testVSchemaForSequenceAfterMoveTables(t)

Expand All @@ -684,14 +687,14 @@ func testMoveTablesV2Workflow(t *testing.T) {
createMoveTablesWorkflow(t, "Lead,Lead-1")
output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "wf1") && listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type"))
require.True(t, listOutputContainsWorkflow(output, "wf1") && listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "enterprise_customer"))

err = tstWorkflowCancel(t)
require.NoError(t, err)

output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1"))
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "enterprise_customer") && !listOutputContainsWorkflow(output, "wf1"))
}

func testPartialSwitches(t *testing.T) {
Expand Down Expand Up @@ -812,7 +815,7 @@ func testRestOfWorkflow(t *testing.T) {
// fully switch and complete
waitForLowLag(t, "customer", "wf1")
waitForLowLag(t, "customer", "customer_name")
waitForLowLag(t, "customer", "customer_type")
waitForLowLag(t, "customer", "enterprise_customer")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
Expand Down
Loading
Loading