Skip to content

Commit

Permalink
Add fillVectorField when there is none
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Nov 14, 2023
1 parent 4335723 commit 0cea143
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,6 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep
vec: &payload.Object_Vector{
Id: vecMeta.GetId(),
Timestamp: vecMeta.GetTimestamp(),
// FIXME: probably should change the interface of foundReplicas
// vector itself is not acuired at this point
},

Check warning on line 362 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L359-L362

Added lines #L359 - L362 were not covered by tests
})
mu.Unlock()
Expand Down Expand Up @@ -415,7 +413,7 @@ func (c *correct) correctTimestamp(ctx context.Context, targetReplica *vectorRep
latest.vec.GetTimestamp(),
)
c.correctedOldIndexCount.Add(1)
if err := c.updateObject(ctx, replica.addr, latest.vec); err != nil {
if err := c.updateObject(ctx, replica, latest); err != nil {
return err
}
}
Expand Down Expand Up @@ -508,18 +506,25 @@ func (c *correct) correctReplica(
return nil
}

func (c *correct) updateObject(ctx context.Context, addr string, vector *payload.Object_Vector) error {
func (c *correct) updateObject(ctx context.Context, dest, src *vectorReplica) error {
// check if the src vector has content not just timestamp
if vec := src.vec.GetVector(); len(vec) == 0 {
if err := c.fillVectorField(ctx, src); err != nil {
return err
}

Check warning on line 514 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L513-L514

Added lines #L513 - L514 were not covered by tests
}

res, err := c.discoverer.GetClient().
Do(grpc.WithGRPCMethod(ctx, updateMethod), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
Do(grpc.WithGRPCMethod(ctx, updateMethod), dest.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
// TODO: use UpdateTimestamp when it's implemented because here we just want to update only the timestamp but not the vector
return vald.NewUpdateClient(conn).Update(ctx, &payload.Update_Request{
Vector: vector,
Vector: src.vec,

Check warning on line 521 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L521

Added line #L521 was not covered by tests
// TODO: this should be deleted after Config.Timestamp deprecation
Config: &payload.Update_Config{
// TODO: Decrementing because it's gonna be incremented befor being pushed
// to vqueue in the agent. This is a not ideal workaround for the current vqueue implementation
// so we should consider refactoring vqueue.
Timestamp: vector.GetTimestamp() - 1,
Timestamp: src.vec.GetTimestamp() - 1,

Check warning on line 527 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L527

Added line #L527 was not covered by tests
},
}, copts...)
})
Expand All @@ -528,7 +533,31 @@ func (c *correct) updateObject(ctx context.Context, addr string, vector *payload
}

if v, ok := res.(*payload.Object_Location); ok {
log.Infof("vector successfully updated. address: %s, uuid: %v", addr, v.GetUuid())
log.Infof("vector successfully updated. address: %s, uuid: %v", dest.addr, v.GetUuid())
}

Check warning on line 537 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L536-L537

Added lines #L536 - L537 were not covered by tests

return nil
}

func (c *correct) fillVectorField(ctx context.Context, replica *vectorReplica) error {
res, err := c.discoverer.GetClient().
Do(grpc.WithGRPCMethod(ctx, "core.v1.Vald/GetObject"), replica.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
return vald.NewValdClient(conn).GetObject(ctx, &payload.Object_VectorRequest{
Id: &payload.Object_ID{
Id: replica.vec.GetId(),
},
}, copts...)
})

Check warning on line 550 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L545-L550

Added lines #L545 - L550 were not covered by tests
if err != nil {
return err
}

Check warning on line 553 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L552-L553

Added lines #L552 - L553 were not covered by tests

if v, ok := res.(*payload.Object_Vector); ok {
vec := v.GetVector()
if len(vec) == 0 {
return err
}
replica.vec.Vector = v.GetVector()

Check warning on line 560 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L556-L560

Added lines #L556 - L560 were not covered by tests
}

return nil
Expand Down

0 comments on commit 0cea143

Please sign in to comment.