Skip to content

Commit

Permalink
Address data race in TestSchemaVersioning
Browse files Browse the repository at this point in the history
This resulted from the fact that the same ctx variable/pointer
is re-used in the test and because we did not wait for the
goroutine to end once the previous context was cancelled
before we overwrote the variable/pointer with a new context
those two goroutines were racing on reads and writes of the address.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed May 23, 2024
1 parent 0d8ca1b commit 4e35f5f
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -65,6 +66,7 @@ func TestSchemaVersioning(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
tsv.EnableHistorian(true)
tsv.SetTracking(true)
time.Sleep(100 * time.Millisecond) // wait for _vt tables to be created
Expand Down Expand Up @@ -155,7 +157,9 @@ func TestSchemaVersioning(t *testing.T) {
}
return nil
}
wg.Add(1)
go func() {
defer wg.Done()
defer close(eventCh)
req := &binlogdatapb.VStreamRequest{Target: target, Position: "current", TableLastPKs: nil, Filter: filter}
if err := tsv.VStream(ctx, req, send); err != nil {
Expand Down Expand Up @@ -186,6 +190,7 @@ func TestSchemaVersioning(t *testing.T) {
}
runCases(ctx, t, cases, eventCh)
cancel()
wg.Wait()

log.Infof("\n\n\n=============================================== PAST EVENTS WITH TRACK VERSIONS START HERE ======================\n\n\n")
ctx, cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -214,7 +219,9 @@ func TestSchemaVersioning(t *testing.T) {
}
return nil
}
wg.Add(1)
go func() {
defer wg.Done()
defer close(eventCh)
req := &binlogdatapb.VStreamRequest{Target: target, Position: startPos, TableLastPKs: nil, Filter: filter}
if err := tsv.VStream(ctx, req, send); err != nil {
Expand Down Expand Up @@ -257,6 +264,7 @@ func TestSchemaVersioning(t *testing.T) {
expectLogs(ctx, t, "Past stream", eventCh, output)

cancel()
wg.Wait()

log.Infof("\n\n\n=============================================== PAST EVENTS WITHOUT TRACK VERSIONS START HERE ======================\n\n\n")
tsv.EnableHistorian(false)
Expand Down Expand Up @@ -286,7 +294,9 @@ func TestSchemaVersioning(t *testing.T) {
}
return nil
}
wg.Add(1)
go func() {
defer wg.Done()
defer close(eventCh)
req := &binlogdatapb.VStreamRequest{Target: target, Position: startPos, TableLastPKs: nil, Filter: filter}
if err := tsv.VStream(ctx, req, send); err != nil {
Expand Down Expand Up @@ -331,6 +341,7 @@ func TestSchemaVersioning(t *testing.T) {

expectLogs(ctx, t, "Past stream", eventCh, output)
cancel()
wg.Wait()

client := framework.NewClient()
client.Execute("drop table vitess_version", nil)
Expand Down

0 comments on commit 4e35f5f

Please sign in to comment.