Skip to content

Commit

Permalink
Vec-263 Vec-269 Fix tend and add cluster/node metatdata methods (#9)
Browse files Browse the repository at this point in the history
* fix: VEC-262 client fails to tend cluster and falls back to seed nodes

* feat: VEC-269 check avs node version and warn

* fix: Channel closed race condition
  • Loading branch information
Jesse S authored Aug 6, 2024
1 parent c369f22 commit 9fc7010
Show file tree
Hide file tree
Showing 4 changed files with 498 additions and 47 deletions.
221 changes: 203 additions & 18 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"context"
"crypto/tls"
"log/slog"
"strconv"
"strings"
"time"

"github.com/aerospike/avs-client-go/protos"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -44,7 +47,7 @@ func NewAdminClient(
logger *slog.Logger,
) (*AdminClient, error) {
logger = logger.WithGroup("avs.admin")
logger.Debug("creating new client")
logger.Info("creating new client")

channelProvider, err := newChannelProvider(
ctx,
Expand Down Expand Up @@ -160,7 +163,7 @@ func (c *AdminClient) IndexCreateFromIndexDef(
logger := c.logger.With(slog.Any("definition", indexDef))
logger.InfoContext(ctx, "creating index from definition")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to create index from definition"
logger.Error(msg, slog.Any("error", err))
Expand Down Expand Up @@ -197,7 +200,7 @@ func (c *AdminClient) IndexUpdate(

logger.InfoContext(ctx, "updating index")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to update index"
logger.Error(msg, slog.Any("error", err))
Expand Down Expand Up @@ -234,7 +237,7 @@ func (c *AdminClient) IndexDrop(ctx context.Context, namespace, name string) err
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
logger.InfoContext(ctx, "dropping index")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to drop index"
logger.Error(msg, slog.Any("error", err))
Expand Down Expand Up @@ -269,7 +272,7 @@ func (c *AdminClient) IndexDrop(ctx context.Context, namespace, name string) err
func (c *AdminClient) IndexList(ctx context.Context) (*protos.IndexDefinitionList, error) {
c.logger.InfoContext(ctx, "listing indexes")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to get indexes"

Expand Down Expand Up @@ -298,7 +301,7 @@ func (c *AdminClient) IndexGet(ctx context.Context, namespace, name string) (*pr
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
logger.InfoContext(ctx, "getting index")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to get index"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -328,7 +331,7 @@ func (c *AdminClient) IndexGetStatus(ctx context.Context, namespace, name string
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))
logger.InfoContext(ctx, "getting index status")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to get index status"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -363,7 +366,7 @@ func (c *AdminClient) GcInvalidVertices(ctx context.Context, namespace, name str

logger.InfoContext(ctx, "garbage collection invalid vertices")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to garbage collect invalid vertices"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -396,7 +399,7 @@ func (c *AdminClient) CreateUser(ctx context.Context, username, password string,
logger := c.logger.With(slog.String("username", username), slog.Any("roles", roles))
logger.InfoContext(ctx, "creating user")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to create user"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -427,7 +430,7 @@ func (c *AdminClient) UpdateCredentials(ctx context.Context, username, password
logger := c.logger.With(slog.String("username", username))
logger.InfoContext(ctx, "updating user credentials")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to update user credentials"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -457,7 +460,7 @@ func (c *AdminClient) DropUser(ctx context.Context, username string) error {
logger := c.logger.With(slog.String("username", username))
logger.InfoContext(ctx, "dropping user")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to drop user"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -487,7 +490,7 @@ func (c *AdminClient) GetUser(ctx context.Context, username string) (*protos.Use
logger := c.logger.With(slog.String("username", username))
logger.InfoContext(ctx, "getting user")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to get user"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -516,7 +519,7 @@ func (c *AdminClient) GetUser(ctx context.Context, username string) (*protos.Use
func (c *AdminClient) ListUsers(ctx context.Context) (*protos.ListUsersResponse, error) {
c.logger.InfoContext(ctx, "listing users")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to list users"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand All @@ -542,7 +545,7 @@ func (c *AdminClient) GrantRoles(ctx context.Context, username string, roles []s
logger := c.logger.With(slog.String("username", username), slog.Any("roles", roles))
logger.InfoContext(ctx, "granting user roles")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to grant user roles"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -573,7 +576,7 @@ func (c *AdminClient) RevokeRoles(ctx context.Context, username string, roles []
logger := c.logger.With(slog.String("username", username), slog.Any("roles", roles))
logger.InfoContext(ctx, "revoking user roles")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to revoke user roles"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -603,7 +606,7 @@ func (c *AdminClient) RevokeRoles(ctx context.Context, username string, roles []
func (c *AdminClient) ListRoles(ctx context.Context) (*protos.ListRolesResponse, error) {
c.logger.InfoContext(ctx, "listing roles")

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to list roles"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand All @@ -624,6 +627,188 @@ func (c *AdminClient) ListRoles(ctx context.Context) (*protos.ListRolesResponse,
return rolesResp, nil
}

// NodeIds returns a list of all the node ids that the client is connected to.
// If a node is accessible but not a part of the cluster it will not be returned.
func (c *AdminClient) NodeIDs(ctx context.Context) []*protos.NodeId {
c.logger.InfoContext(ctx, "getting cluster info")

ids := c.channelProvider.GetNodeIDs()
nodeIDs := make([]*protos.NodeId, len(ids))

for i, id := range ids {
nodeIDs[i] = &protos.NodeId{Id: id}
}

c.logger.Debug("got node ids", slog.Any("nodeIDs", nodeIDs))

return nodeIDs
}

// ConnectedNodeEndpoint returns the endpoint used to connect to a node. If
// nodeID is nil then an endpoint used to connect to your seed (or
// load-balancer) is used.
func (c *AdminClient) ConnectedNodeEndpoint(
ctx context.Context,
nodeID *protos.NodeId,
) (*protos.ServerEndpoint, error) {
c.logger.InfoContext(ctx, "getting connected endpoint for node", slog.Any("nodeID", nodeID))

var (
conn *grpc.ClientConn
err error
)

if nodeID == nil {
conn, err = c.channelProvider.GetSeedConn()
} else {
conn, err = c.channelProvider.GetNodeConn(nodeID.Id)
}

if err != nil {
msg := "failed to get connected endpoint"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))

return nil, NewAVSError(msg)
}

splitEndpoint := strings.Split(conn.Target(), ":")

resp := protos.ServerEndpoint{
Address: splitEndpoint[0],
}

if len(splitEndpoint) > 1 {
port, err := strconv.ParseUint(splitEndpoint[1], 10, 32)
if err != nil {
msg := "failed to parse port"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))

return nil, NewAVSErrorFromGrpc(msg, err)
}

resp.Port = uint32(port)
}

return &resp, nil
}

// ClusteringState returns the state of the cluster according the
// given node. If nodeID is nil then the seed node is used.
func (c *AdminClient) ClusteringState(ctx context.Context, nodeID *protos.NodeId) (*protos.ClusteringState, error) {
c.logger.InfoContext(ctx, "getting clustering state for node", slog.Any("nodeID", nodeID))

var (
conn *grpc.ClientConn
err error
)

if nodeID == nil {
conn, err = c.channelProvider.GetSeedConn()
} else {
conn, err = c.channelProvider.GetNodeConn(nodeID.GetId())
}

if err != nil {
msg := "failed to list roles"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))

return nil, NewAVSErrorFromGrpc(msg, err)
}

client := protos.NewClusterInfoServiceClient(conn)

state, err := client.GetClusteringState(ctx, &emptypb.Empty{})
if err != nil {
msg := "failed to get clustering state"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))

return nil, NewAVSErrorFromGrpc(msg, err)
}

return state, nil
}

// ClusterEndpoints returns the endpoints of all the nodes in the cluster
// according to the specified node. If nodeID is nil then the seed node is used.
// If listenerName is nil then the default listener name is used.
func (c *AdminClient) ClusterEndpoints(
ctx context.Context,
nodeID *protos.NodeId,
listenerName *string,
) (*protos.ClusterNodeEndpoints, error) {
c.logger.InfoContext(ctx, "getting cluster endpoints for node", slog.Any("nodeID", nodeID))

var (
conn *grpc.ClientConn
err error
)

if nodeID == nil {
conn, err = c.channelProvider.GetSeedConn()
} else {
conn, err = c.channelProvider.GetNodeConn(nodeID.GetId())
}

if err != nil {
msg := "failed to get cluster endpoints"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))

return nil, NewAVSErrorFromGrpc(msg, err)
}

client := protos.NewClusterInfoServiceClient(conn)

endpoints, err := client.GetClusterEndpoints(ctx,
&protos.ClusterNodeEndpointsRequest{
ListenerName: listenerName,
},
)
if err != nil {
msg := "failed to get cluster endpoints"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))

return nil, NewAVSErrorFromGrpc(msg, err)
}

return endpoints, nil
}

// About returns information about the provided node. If nodeID is nil
// then the seed node is used.
func (c *AdminClient) About(ctx context.Context, nodeID *protos.NodeId) (*protos.AboutResponse, error) {
c.logger.InfoContext(ctx, "getting \"about\" info from nodes")

var (
conn *grpc.ClientConn
err error
)

if nodeID == nil {
conn, err = c.channelProvider.GetSeedConn()
} else {
conn, err = c.channelProvider.GetNodeConn(nodeID.GetId())
}

if err != nil {
msg := "failed to make about request"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))

return nil, NewAVSErrorFromGrpc(msg, err)
}

client := protos.NewAboutServiceClient(conn)

resp, err := client.Get(ctx, &protos.AboutRequest{})
if err != nil {
msg := "failed to make about request"
c.logger.ErrorContext(ctx, msg, slog.Any("error", err))

return nil, NewAVSErrorFromGrpc(msg, err)
}

return resp, nil
}

// waitForIndexCreation waits for an index to be created and blocks until it is.
// The amount of time to wait between each call is defined by waitInterval.
func (c *AdminClient) waitForIndexCreation(ctx context.Context,
Expand All @@ -633,7 +818,7 @@ func (c *AdminClient) waitForIndexCreation(ctx context.Context,
) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to wait for index creation"
logger.Error(msg, slog.Any("error", err))
Expand Down Expand Up @@ -689,7 +874,7 @@ func (c *AdminClient) waitForIndexCreation(ctx context.Context,
func (c *AdminClient) waitForIndexDrop(ctx context.Context, namespace, name string, waitInterval time.Duration) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("name", name))

conn, err := c.channelProvider.GetConn()
conn, err := c.channelProvider.GetRandomConn()
if err != nil {
msg := "failed to wait for index deletion"
logger.Error(msg, slog.Any("error", err))
Expand Down
Loading

0 comments on commit 9fc7010

Please sign in to comment.