Skip to content

Commit

Permalink
[release-15.0] Enable failures in tools/e2e_test_race.sh and fix ra…
Browse files Browse the repository at this point in the history
…ces (vitessio#13654) (vitessio#14009)

Signed-off-by: Florent Poinsard <[email protected]>
Signed-off-by: Brendan Dougherty <[email protected]>
Signed-off-by: Austen Lacy <[email protected]>
Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Brendan Dougherty <[email protected]>
Co-authored-by: Austen Lacy <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
4 people authored Oct 2, 2023
1 parent bcfca87 commit 1814eaf
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 85 deletions.
18 changes: 15 additions & 3 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"regexp"
"sync"
"testing"

Expand Down Expand Up @@ -330,9 +331,9 @@ func TestVStreamSharded(t *testing.T) {
received bool
}
expectedEvents := []*expectedEvent{
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"-80"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"80-"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"80-"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"}`, false},
}
for {
Expand All @@ -357,7 +358,7 @@ func TestVStreamSharded(t *testing.T) {
for _, ev := range evs {
s := fmt.Sprintf("%v", ev)
for _, expectedEv := range expectedEvents {
if expectedEv.ev == s {
if removeAnyDeprecatedDisplayWidths(expectedEv.ev) == removeAnyDeprecatedDisplayWidths(s) {
expectedEv.received = true
break
}
Expand All @@ -381,6 +382,17 @@ func TestVStreamSharded(t *testing.T) {

}

func removeAnyDeprecatedDisplayWidths(orig string) string {
var adjusted string
baseIntType := "int"
intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`)
adjusted = intRE.ReplaceAllString(orig, baseIntType)
baseYearType := "year"
yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`)
adjusted = yearRE.ReplaceAllString(adjusted, baseYearType)
return adjusted
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package framework
import (
"context"
"errors"
"sync"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -40,6 +41,7 @@ type QueryClient struct {
target *querypb.Target
server *tabletserver.TabletServer
transactionID int64
reservedIDMu sync.Mutex
reservedID int64
sessionStateChanges string
}
Expand Down Expand Up @@ -114,6 +116,8 @@ func (client *QueryClient) Commit() error {
func (client *QueryClient) Rollback() error {
defer func() { client.transactionID = 0 }()
rID, err := client.server.Rollback(client.ctx, client.target, client.transactionID)
client.reservedIDMu.Lock()
defer client.reservedIDMu.Unlock()
client.reservedID = rID
if err != nil {
return err
Expand Down Expand Up @@ -293,6 +297,8 @@ func (client *QueryClient) MessageAck(name string, ids []string) (int64, error)

// ReserveExecute performs a ReserveExecute.
func (client *QueryClient) ReserveExecute(query string, preQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
client.reservedIDMu.Lock()
defer client.reservedIDMu.Unlock()
if client.reservedID != 0 {
return nil, errors.New("already reserved a connection")
}
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestShutdownGracePeriod(t *testing.T) {
err := client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.Execute("select sleep(10) from dual", nil)
_, err := client.Execute("select sleep(10) from dual", nil)
assert.Error(t, err)
}()

Expand All @@ -346,7 +346,7 @@ func TestShutdownGracePeriod(t *testing.T) {
err = client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.Execute("select sleep(11) from dual", nil)
_, err := client.Execute("select sleep(11) from dual", nil)
assert.Error(t, err)
}()

Expand All @@ -373,7 +373,7 @@ func TestShutdownGracePeriodWithStreamExecute(t *testing.T) {
err := client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.StreamExecute("select sleep(10) from dual", nil)
_, err := client.StreamExecute("select sleep(10) from dual", nil)
assert.Error(t, err)
}()

Expand All @@ -398,7 +398,7 @@ func TestShutdownGracePeriodWithStreamExecute(t *testing.T) {
err = client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.StreamExecute("select sleep(11) from dual", nil)
_, err := client.StreamExecute("select sleep(11) from dual", nil)
assert.Error(t, err)
}()

Expand All @@ -425,7 +425,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) {
err := client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.ReserveExecute("select sleep(10) from dual", nil, nil)
_, err := client.ReserveExecute("select sleep(10) from dual", nil, nil)
assert.Error(t, err)
}()

Expand All @@ -450,7 +450,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) {
err = client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.ReserveExecute("select sleep(11) from dual", nil, nil)
_, err := client.ReserveExecute("select sleep(11) from dual", nil, nil)
assert.Error(t, err)
}()

Expand Down
26 changes: 26 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,32 @@ func (se *Engine) GetSchema() map[string]*Table {
return tables
}

// MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema
func (se *Engine) MarshalMinimalSchema() ([]byte, error) {
se.mu.Lock()
defer se.mu.Unlock()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: make([]*binlogdatapb.MinimalTable, 0, len(se.tables)),
}
for _, table := range se.tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
}
return dbSchema.MarshalVT()
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
pkc := make([]int64, len(st.PKColumns))
for i, pk := range st.PKColumns {
pkc[i] = int64(pk)
}
table.PKColumns = pkc
return table
}

// GetConnection returns a connection from the pool
func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) {
return se.conns.Get(ctx, nil)
Expand Down
25 changes: 3 additions & 22 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"sync"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -240,14 +238,10 @@ func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error
}

func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error {
tables := tr.engine.GetSchema()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: []*binlogdatapb.MinimalTable{},
}
for _, table := range tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
blob, err := tr.engine.MarshalMinimalSchema()
if err != nil {
return err
}
blob, _ := proto.Marshal(dbSchema)

conn, err := tr.engine.GetConnection(ctx)
if err != nil {
Expand All @@ -265,19 +259,6 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
return nil
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
var pkc []int64
for _, pk := range st.PKColumns {
pkc = append(pkc, int64(pk))
}
table.PKColumns = pkc
return table
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletserver/throttle/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type Client struct {
checkType ThrottleCheckType
flags CheckFlags

lastSuccessfulThrottle int64
lastSuccessfulThrottleMu sync.Mutex
lastSuccessfulThrottle int64
}

// NewProductionClient creates a client suitable for foreground/production jobs, which have normal priority.
Expand Down Expand Up @@ -94,6 +95,8 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName string) (t
// no throttler
return true
}
c.lastSuccessfulThrottleMu.Lock()
defer c.lastSuccessfulThrottleMu.Unlock()
if c.lastSuccessfulThrottle >= atomic.LoadInt64(&throttleTicks) {
// if last check was OK just very recently there is no need to check again
return true
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,14 @@ func (rs *rowStreamer) buildPlan() error {
return err
}
ti := &Table{
Name: st.Name,
Fields: st.Fields,
Name: st.Name,
}

ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields)
if err != nil {
return err
}

// The plan we build is identical to the one for vstreamer.
// This is because the row format of a read is identical
// to the row format of a binlog event. So, the same
Expand Down
Loading

0 comments on commit 1814eaf

Please sign in to comment.