Skip to content

Commit

Permalink
Add start and stop volumeWatcher (#2320)
Browse files Browse the repository at this point in the history
* Add start and stop volumeWatcher

- Adds two function in Watcher interface and fix tests

JIRA: PWX-32359

Signed-off-by: dahuang <[email protected]>
  • Loading branch information
dahuang-purestorage authored Aug 17, 2023
1 parent fa4a9b2 commit 8015fc0
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 2 deletions.
8 changes: 8 additions & 0 deletions api/client/volume/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,18 @@ func (v *volumeClient) GraphDriverRemove(id string) error {
return nil
}

func (v *volumeClient) StartVolumeWatcher() {
return
}

func (v *volumeClient) GetVolumeWatcher(locator *api.VolumeLocator, labels map[string]string) (chan *api.Volume, error) {
return nil, nil
}

func (v *volumeClient) StopVolumeWatcher() {
return
}

func (v *volumeClient) GraphDriverGet(id string, mountLabel string) (string, error) {
response := ""
if err := v.c.Get().Resource(graphPath + "/inspect").Instance(id).Do().Unmarshal(&response); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion api/server/sdk/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func newTestServer(t *testing.T) *testServer {

assert.Nil(t, err)

tester.m.EXPECT().StartVolumeWatcher().Return().Times(1)
tester.m.EXPECT().GetVolumeWatcher(&api.VolumeLocator{}, make(map[string]string)).DoAndReturn(func(a *api.VolumeLocator, l map[string]string) (chan *api.Volume, error) {
ch := make(chan *api.Volume, 1)
tester.server.watcherCtxCancel()
Expand Down Expand Up @@ -233,7 +234,7 @@ func newTestServerAuth(t *testing.T) *testServer {
},
})
assert.Nil(t, err)

tester.m.EXPECT().StartVolumeWatcher().Return().Times(1)
tester.m.EXPECT().GetVolumeWatcher(&api.VolumeLocator{}, make(map[string]string)).DoAndReturn(func(a *api.VolumeLocator, l map[string]string) (chan *api.Volume, error) {
ch := make(chan *api.Volume, 1)
tester.server.watcherCtxCancel()
Expand Down Expand Up @@ -294,6 +295,7 @@ func (s *testServer) Stop() {

// Shutdown servers
s.conn.Close()
s.m.EXPECT().StopVolumeWatcher().Return().AnyTimes()
s.server.Stop()
s.gw.Close()

Expand Down
1 change: 1 addition & 0 deletions api/server/sdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func (s *Server) Stop() {
s.netServer.Stop()
s.udsServer.Stop()
s.restGateway.Stop()
s.netServer.watcherServer.stopWatcher(s.watcherCtx)
s.watcherCtxCancel()

if s.accessLog != nil {
Expand Down
6 changes: 5 additions & 1 deletion api/server/sdk/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func (s *WatcherServer) removeWatcher(name string, eventType string) {
s.watchConnections[eventType] = newWatchers
}

func (s *WatcherServer) stopWatcher(ctx context.Context) {
s.volumeServer.driver(ctx).StopVolumeWatcher()
}

func (s *WatcherServer) startWatcher(ctx context.Context) error {
group, _ := errgroup.WithContext(ctx)
errChan := make(chan error)
Expand Down Expand Up @@ -121,7 +125,7 @@ func (s *WatcherServer) startVolumeWatcher(ctx context.Context) error {
time.Sleep(2 * time.Second)
continue
}

s.volumeServer.driver(ctx).StartVolumeWatcher()
volumeChannel, err := s.volumeServer.driver(ctx).GetVolumeWatcher(&api.VolumeLocator{}, make(map[string]string))
if err != nil {
logrus.Warnf("Error getting volume watcher %v", err)
Expand Down
1 change: 1 addition & 0 deletions csi/csi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (s *testServer) Stop() {

// Shutdown servers
s.conn.Close()
s.m.EXPECT().StopVolumeWatcher().Return().AnyTimes()
s.server.Stop()
s.sdk.Stop()

Expand Down
8 changes: 8 additions & 0 deletions volume/drivers/buse/buse.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,18 @@ func Init(params map[string]string) (volume.VolumeDriver, error) {
// These functions below implement the volume driver interface.
//

func (d *driver) StartVolumeWatcher() {
return
}

func (d *driver) GetVolumeWatcher(locator *api.VolumeLocator, labels map[string]string) (chan *api.Volume, error) {
return nil, nil
}

func (d *driver) StopVolumeWatcher() {
return
}

func (d *driver) String() string {
return Name
}
Expand Down
8 changes: 8 additions & 0 deletions volume/drivers/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func volumeGenerator(d *driver) {
}
}

func (d *driver) StartVolumeWatcher() {
return
}

func (d *driver) GetVolumeWatcher(locator *api.VolumeLocator, labels map[string]string) (chan *api.Volume, error) {
go volumeGenerator(d)
if d.volumeChannel == nil {
Expand All @@ -150,6 +154,10 @@ func (d *driver) GetVolumeWatcher(locator *api.VolumeLocator, labels map[string]
return d.volumeChannel, nil
}

func (d *driver) StopVolumeWatcher() {
return
}

func (d *driver) Name() string {
return Name
}
Expand Down
7 changes: 7 additions & 0 deletions volume/drivers/fuse/volume_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,18 @@ func newVolumeDriver(
provider,
}
}
func (v *volumeDriver) StartVolumeWatcher() {
return
}

func (v *volumeDriver) GetVolumeWatcher(locator *api.VolumeLocator, labels map[string]string) (chan *api.Volume, error) {
return nil, nil
}

func (v *volumeDriver) StopVolumeWatcher() {
return
}

func (v *volumeDriver) Name() string {
return v.name
}
Expand Down
24 changes: 24 additions & 0 deletions volume/drivers/mock/driver.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions volume/drivers/nfs/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,18 @@ func Init(params map[string]string) (volume.VolumeDriver, error) {
return inst, nil
}

func (d *driver) StartVolumeWatcher() {
return
}

func (d *driver) GetVolumeWatcher(locator *api.VolumeLocator, labels map[string]string) (chan *api.Volume, error) {
return nil, nil
}

func (d *driver) StopVolumeWatcher() {
return
}

func (d *driver) Name() string {
return Name
}
Expand Down
8 changes: 8 additions & 0 deletions volume/drivers/vfs/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,18 @@ func Init(params map[string]string) (volume.VolumeDriver, error) {
}, nil
}

func (d *driver) StartVolumeWatcher() {
return
}

func (d *driver) GetVolumeWatcher(locator *api.VolumeLocator, labels map[string]string) (chan *api.Volume, error) {
return nil, nil
}

func (d *driver) StopVolumeWatcher() {
return
}

func (d *driver) Name() string {
return Name
}
Expand Down
4 changes: 4 additions & 0 deletions volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,12 @@ type Enumerator interface {

// Water provides a set of function to get volume
type Watcher interface {
// Stop Volume notifier
StartVolumeWatcher()
// Gets Volume notifier
GetVolumeWatcher(locator *api.VolumeLocator, labels map[string]string) (chan *api.Volume, error)
// Stop Volume notifier
StopVolumeWatcher()
}

// StoreEnumerator combines Store and Enumerator capabilities
Expand Down

0 comments on commit 8015fc0

Please sign in to comment.