Skip to content

Commit

Permalink
add correlation tracing to nsm filter Stats
Browse files Browse the repository at this point in the history
Signed-off-by: alice lyu <[email protected]>
  • Loading branch information
alicelyy committed Jan 18, 2024
1 parent d3226fa commit 21692f2
Show file tree
Hide file tree
Showing 629 changed files with 47,797 additions and 16,805 deletions.
66 changes: 64 additions & 2 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ func NewClient(host, version, userAgent string) (*Client, error) {
return nil, fmt.Errorf("Unable to parse provided url: %v", host)
}
c := &Client{
host: host,
tlsConfig: nil,
base: baseURL,
version: version,
httpClient: hClient,
authstring: "",
accesstoken: "",
userAgent: fmt.Sprintf("%v/%v", userAgent, version),
}
c.transport = hClient.Transport
hClient.Transport = c
return c, nil
}

Expand All @@ -55,13 +59,17 @@ func NewAuthClient(host, version, authstring, accesstoken, userAgent string) (*C
return nil, fmt.Errorf("Unable to parse provided url: %v", host)
}
c := &Client{
host: host,
tlsConfig: nil,
base: baseURL,
version: version,
httpClient: hClient,
authstring: authstring,
accesstoken: accesstoken,
userAgent: fmt.Sprintf("%v/%v", userAgent, version),
}
c.transport = hClient.Transport
hClient.Transport = c
return c, nil
}

Expand All @@ -79,6 +87,9 @@ func GetUnixServerPath(socketName string, paths ...string) string {
// Client is an HTTP REST wrapper. Use one of Get/Post/Put/Delete to get a request
// object.
type Client struct {
host string
tlsConfig *tls.Config
transport http.RoundTripper
base *url.URL
version string
httpClient *http.Client
Expand All @@ -92,9 +103,16 @@ func (c *Client) BaseURL() string {
}

func (c *Client) SetTLS(tlsConfig *tls.Config) {
transport := &http.Transport{TLSClientConfig: c.tlsConfig}

// re-assign transport layer defaults
c.tlsConfig = tlsConfig
c.transport = transport

c.httpClient = &http.Client{
Transport: &http.Transport{TLSClientConfig: tlsConfig},
Transport: c,
}

}

// Versions send a request at the /versions REST endpoint.
Expand All @@ -111,7 +129,8 @@ func (c *Client) Get() *Request {

// Post returns a Request object setup for POST call.
func (c *Client) Post() *Request {
return NewRequest(c.httpClient, c.base, http.MethodPost, c.version, c.authstring, c.userAgent)
r := NewRequest(c.httpClient, c.base, http.MethodPost, c.version, c.authstring, c.userAgent)
return r
}

// Put returns a Request object setup for PUT call.
Expand All @@ -138,6 +157,41 @@ func unix2HTTP(u *url.URL) {
}
}

// shouldRoundTripRetry
func (c *Client) shouldRoundTripRetry(res *http.Response, err error) bool {
if http.ErrHandlerTimeout == err || res == nil {
return true
}
return res.StatusCode == http.StatusRequestTimeout ||
res.StatusCode == http.StatusGatewayTimeout
}

// RoundTrip
// When creating the http client we cache the client against the host without any expiration
// when picking the client from cache for any further request
// The cached client resolves the IP that was assigned to the node prior to DHCP update
// A custom round tripper in the transport layer which can invalidate the cache and
// build a new http client in case of a timeout.
// Rebuilding the cache on timeout and retrying will resolve the new IP for that host.
func (c *Client) RoundTrip(req *http.Request) (res *http.Response, err error) {
res, err = c.transport.RoundTrip(req)

if c.shouldRoundTripRetry(res, err) {
retireHTTPClient(c.host)
c.httpClient = getHTTPClient(c.host)
if c.tlsConfig != nil {
c.SetTLS(c.tlsConfig)
} else {
c.transport = c.httpClient.Transport
c.httpClient.Transport = c
}

res, err = c.transport.RoundTrip(req)
}

return
}

func newHTTPClient(
u *url.URL,
tlsConfig *tls.Config,
Expand Down Expand Up @@ -166,6 +220,14 @@ func newHTTPClient(
return &http.Client{Transport: httpTransport, Timeout: responseTimeout}
}

func retireHTTPClient(host string) {
cacheLock.Lock()
defer cacheLock.Unlock()
if _, ok := httpCache[host]; ok {
delete(httpCache, host)
}
}

func getHTTPClient(host string) *http.Client {
cacheLock.Lock()
defer cacheLock.Unlock()
Expand Down
15 changes: 5 additions & 10 deletions api/client/volume/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ func (v *volumeClient) Status() [][2]string {

// Inspect specified volumes.
// Errors ErrEnoEnt may be returned.
func (v *volumeClient) Inspect(ids []string) ([]*api.Volume, error) {
if len(ids) == 0 {
func (v *volumeClient) Inspect(ctx context.Context, volumeIDs []string) ([]*api.Volume, error) {
if len(volumeIDs) == 0 {
return nil, nil
}
var volumes []*api.Volume
request := v.c.Get().Resource(volumePath)
for _, id := range ids {
for _, id := range volumeIDs {
request.QueryOption(api.OptVolumeID, id)
}
if err := request.Do().Unmarshal(&volumes); err != nil {
Expand Down Expand Up @@ -246,10 +246,7 @@ func (v *volumeClient) Restore(volumeID string, snapID string) error {

// Stats for specified volume.
// Errors ErrEnoEnt may be returned
func (v *volumeClient) Stats(
volumeID string,
cumulative bool,
) (*api.Stats, error) {
func (v *volumeClient) Stats(ctx context.Context, volumeID string, cumulative bool) (*api.Stats, error) {
stats := &api.Stats{}
req := v.c.Get().Resource(volumePath + "/stats").Instance(volumeID)
req.QueryOption(api.OptCumulative, strconv.FormatBool(cumulative))
Expand Down Expand Up @@ -306,9 +303,7 @@ func (v *volumeClient) CapacityUsage(
return requests, nil
}

func (v *volumeClient) VolumeUsageByNode(
nodeID string,
) (*api.VolumeUsageByNode, error) {
func (v *volumeClient) VolumeUsageByNode(ctx context.Context, nodeID string) (*api.VolumeUsageByNode, error) {

return nil, volume.ErrNotSupported

Expand Down
3 changes: 2 additions & 1 deletion api/client/volume/client_volume_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package volume

import (
"context"
"crypto/tls"
"encoding/json"
"net/http"
Expand All @@ -25,7 +26,7 @@ func TestClientTLS(t *testing.T) {

clnt.SetTLS(&tls.Config{InsecureSkipVerify: true})

_, err = VolumeDriver(clnt).Inspect([]string{"12345"})
_, err = VolumeDriver(clnt).Inspect(context.TODO(), []string{"12345"})

require.NoError(t, err)
}
8 changes: 4 additions & 4 deletions api/server/middleware_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (a *authMiddleware) setWithAuth(w http.ResponseWriter, r *http.Request, nex
if err != nil {
processErrorForVolSetResponse(req.Action, err, &resp)
} else {
v, err := d.Inspect([]string{volumeID})
v, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil {
processErrorForVolSetResponse(req.Action, err, &resp)
} else if v == nil || len(v) != 1 {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (a *authMiddleware) deleteWithAuth(w http.ResponseWriter, r *http.Request,
return
}

vols, err := d.Inspect([]string{volumeID})
vols, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil || len(vols) == 0 || vols[0] == nil {
json.NewEncoder(w).Encode(volumeResponse)
return
Expand Down Expand Up @@ -338,7 +338,7 @@ func (a *authMiddleware) inspectWithAuth(w http.ResponseWriter, r *http.Request,
return
}

dk, err := d.Inspect([]string{volumeID})
dk, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil {
a.log(volumeID, fn).WithError(err).Error("Failed to inspect volume")
http.Error(w, err.Error(), http.StatusNotFound)
Expand Down Expand Up @@ -368,7 +368,7 @@ func (a *authMiddleware) enumerateWithAuth(w http.ResponseWriter, r *http.Reques
}
volumeID := volIDs[0]

vols, err := d.Inspect([]string{volumeID})
vols, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil || len(vols) == 0 || vols[0] == nil {
a.log(volumeID, fn).WithError(err).Error("Failed to get volume object")
json.NewEncoder(w).Encode(emptyVols)
Expand Down
2 changes: 1 addition & 1 deletion api/server/sdk/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *NodeServer) VolumeUsageByNode(
if s.server.driver(ctx) == nil {
return nil, status.Error(codes.Unavailable, "Resource has not been initialized")
}
resp, err := s.server.driver(ctx).VolumeUsageByNode(req.GetNodeId())
resp, err := s.server.driver(ctx).VolumeUsageByNode(ctx, req.GetNodeId())
if err != nil {
return nil, status.Errorf(codes.Internal, " Failed to get VolumeUsageByNode :%v", err.Error())
}
Expand Down
5 changes: 3 additions & 2 deletions api/server/sdk/volume_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/libopenstorage/openstorage/api"
"github.com/libopenstorage/openstorage/pkg/auth"
policy "github.com/libopenstorage/openstorage/pkg/storagepolicy"
"github.com/libopenstorage/openstorage/pkg/correlation"
"github.com/libopenstorage/openstorage/pkg/util"
"github.com/libopenstorage/openstorage/volume"
"github.com/portworx/kvdb"
Expand Down Expand Up @@ -501,7 +502,7 @@ func (s *VolumeServer) Inspect(
}
v = vols[0]
} else {
vols, err := s.driver(ctx).Inspect([]string{req.GetVolumeId()})
vols, err := s.driver(ctx).Inspect(correlation.TODO(), []string{req.GetVolumeId()})
if err == kvdb.ErrNotFound || (err == nil && len(vols) == 0) {
return nil, status.Errorf(
codes.NotFound,
Expand Down Expand Up @@ -754,7 +755,7 @@ func (s *VolumeServer) Stats(
return nil, err
}

stats, err := s.driver(ctx).Stats(req.GetVolumeId(), !req.GetNotCumulative())
stats, err := s.driver(ctx).Stats(ctx, req.GetVolumeId(), !req.GetNotCumulative())
if err != nil {
return nil, status.Errorf(
codes.Internal,
Expand Down
Loading

0 comments on commit 21692f2

Please sign in to comment.