Skip to content

Commit

Permalink
!feat: VEC-273 0.9.1 support
Browse files Browse the repository at this point in the history
- replace ScheduleDelay with Schedule
- replace merge.Parallel with merge.IndexParallel
- add merge.ReIndexParallel
  • Loading branch information
Jesse Schmidt committed Aug 29, 2024
1 parent b7ab11d commit 9efffd6
Show file tree
Hide file tree
Showing 9 changed files with 740 additions and 340 deletions.
67 changes: 32 additions & 35 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func (c *Client) VectorSearchBool(ctx context.Context,
func (c *Client) WaitForIndexCompletion(
ctx context.Context,
namespace,
indexName string,
indexName string, // TODO: Converge all to name or indexName?
waitInterval time.Duration,
) error {
logger := c.logger.With(slog.String("namespace", namespace), slog.String("indexName", indexName))
Expand All @@ -625,11 +625,7 @@ func (c *Client) WaitForIndexCompletion(
return err
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: indexName,
}

indexStatusReq := createIndexStatusRequest(namespace, indexName)
timer := time.NewTimer(waitInterval)
startTime := time.Now()
unmergedZeroCount := 0
Expand All @@ -638,7 +634,7 @@ func (c *Client) WaitForIndexCompletion(
defer timer.Stop()

for {
indexStatus, err := conn.indexClient.GetStatus(ctx, indexID)
indexStatus, err := conn.indexClient.GetStatus(ctx, indexStatusReq)
if err != nil {
logger.ErrorContext(ctx, "failed to wait for index completion", slog.Any("error", err))
return err
Expand Down Expand Up @@ -783,7 +779,11 @@ func (c *Client) IndexCreateFromIndexDef(
return NewAVSError(msg, err)
}

_, err = conn.indexClient.Create(ctx, indexDef)
indexCreateReq := &protos.IndexCreateRequest{
Definition: indexDef,
}

_, err = conn.indexClient.Create(ctx, indexCreateReq)
if err != nil {
msg := "failed to create index"
logger.Error(msg, slog.Any("error", err))
Expand Down Expand Up @@ -874,12 +874,14 @@ func (c *Client) IndexDrop(ctx context.Context, namespace, name string) error {
return NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
indexDropReq := &protos.IndexDropRequest{
IndexId: &protos.IndexId{
Namespace: namespace,
Name: name,
},
}

_, err = conn.indexClient.Drop(ctx, indexID)
_, err = conn.indexClient.Drop(ctx, indexDropReq)
if err != nil {
msg := "failed to drop index"

Expand All @@ -904,7 +906,7 @@ func (c *Client) IndexDrop(ctx context.Context, namespace, name string) error {
//
// *protos.IndexDefinitionList: A list of index definitions.
// error: An error if the list retrieval fails, otherwise nil.
func (c *Client) IndexList(ctx context.Context) (*protos.IndexDefinitionList, error) {
func (c *Client) IndexList(ctx context.Context, applyDefaults bool) (*protos.IndexDefinitionList, error) {
c.logger.DebugContext(ctx, "listing indexes")

conn, err := c.channelProvider.GetRandomConn()
Expand All @@ -916,7 +918,11 @@ func (c *Client) IndexList(ctx context.Context) (*protos.IndexDefinitionList, er
return nil, NewAVSErrorFromGrpc(msg, err)
}

indexList, err := conn.indexClient.List(ctx, nil)
indexListReq := &protos.IndexListRequest{
ApplyDefaults: &applyDefaults,
}

indexList, err := conn.indexClient.List(ctx, indexListReq)
if err != nil {
msg := "failed to get indexes"

Expand Down Expand Up @@ -952,12 +958,14 @@ func (c *Client) IndexGet(ctx context.Context, namespace, name string) (*protos.
return nil, NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
indexGetReq := &protos.IndexGetRequest{
IndexId: &protos.IndexId{
Namespace: namespace,
Name: name,
},
}

indexDef, err := conn.indexClient.Get(ctx, indexID)
indexDef, err := conn.indexClient.Get(ctx, indexGetReq)
if err != nil {
msg := "failed to get index"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -992,12 +1000,9 @@ func (c *Client) IndexGetStatus(ctx context.Context, namespace, name string) (*p
return nil, NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
}
indexStatusReq := createIndexStatusRequest(namespace, name)

indexStatus, err := conn.indexClient.GetStatus(ctx, indexID)
indexStatus, err := conn.indexClient.GetStatus(ctx, indexStatusReq)
if err != nil {
msg := "failed to get index status"
logger.ErrorContext(ctx, msg, slog.Any("error", err))
Expand Down Expand Up @@ -1556,17 +1561,13 @@ func (c *Client) waitForIndexCreation(ctx context.Context,
return NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
}

indexStatusReq := createIndexStatusRequest(namespace, name)
timer := time.NewTimer(waitInterval)

defer timer.Stop()

for {
_, err := conn.indexClient.GetStatus(ctx, indexID)
_, err := conn.indexClient.GetStatus(ctx, indexStatusReq)
if err != nil {
code := status.Code(err)
if code == codes.Unavailable || code == codes.NotFound {
Expand Down Expand Up @@ -1611,17 +1612,13 @@ func (c *Client) waitForIndexDrop(ctx context.Context, namespace, name string, w
return NewAVSErrorFromGrpc(msg, err)
}

indexID := &protos.IndexId{
Namespace: namespace,
Name: name,
}

indexStatusReq := createIndexStatusRequest(namespace, name)
timer := time.NewTimer(waitInterval)

defer timer.Stop()

for {
_, err := conn.indexClient.GetStatus(ctx, indexID)
_, err := conn.indexClient.GetStatus(ctx, indexStatusReq)
if err != nil {
code := status.Code(err)
if code == codes.Unavailable || code == codes.NotFound {
Expand Down
1 change: 0 additions & 1 deletion protos/auth.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,3 @@ service AuthService {
// Request authentication.
rpc Authenticate(AuthRequest) returns (AuthResponse) {}
}

Loading

0 comments on commit 9efffd6

Please sign in to comment.