Skip to content

Commit

Permalink
Address data races on memorytopo.closed
Browse files Browse the repository at this point in the history
This resolves the seen data races on the variable.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 26, 2024
1 parent 491e416 commit 2974d0a
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
8 changes: 4 additions & 4 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) {
c.factory.callstats.Add([]string{"NewLeaderParticipation"}, 1)

if c.closed {
if c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -74,7 +74,7 @@ type cLeaderParticipation struct {

// WaitForLeadership is part of the topo.LeaderParticipation interface.
func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func (mp *cLeaderParticipation) Stop() {

// GetCurrentLeaderID is part of the topo.LeaderParticipation interface
func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return "", ErrConnectionClosed
}

Expand All @@ -141,7 +141,7 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string,

// WaitForNewLeader is part of the topo.LeaderParticipation interface
func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (ld *memoryTopoLockDescriptor) Unlock(ctx context.Context) error {
}

func (c *Conn) unlock(ctx context.Context, dirPath string) error {
if c.closed {
if c.closed.Load() {
return ErrConnectionClosed
}

Expand Down
7 changes: 4 additions & 3 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -134,13 +135,13 @@ type Conn struct {
factory *Factory
cell string
serverAddr string
closed bool
closed atomic.Bool
}

// dial returns immediately, unless the Conn points to the sentinel
// UnreachableServerAddr, in which case it will block until the context expires.
func (c *Conn) dial(ctx context.Context) error {
if c.closed {
if c.closed.Load() {
return ErrConnectionClosed
}
if c.serverAddr == UnreachableServerAddr {
Expand All @@ -153,7 +154,7 @@ func (c *Conn) dial(ctx context.Context) error {
// Close is part of the topo.Conn interface.
func (c *Conn) Close() {
c.factory.callstats.Add([]string{"Close"}, 1)
c.closed = true
c.closed.Store(true)
}

type watch struct {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) {
c.factory.callstats.Add([]string{"Watch"}, 1)

if c.closed {
if c.closed.Load() {
return nil, nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -79,7 +79,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c
func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) {
c.factory.callstats.Add([]string{"WatchRecursive"}, 1)

if c.closed {
if c.closed.Load() {
return nil, nil, ErrConnectionClosed
}

Expand Down

0 comments on commit 2974d0a

Please sign in to comment.