Skip to content

Commit

Permalink
Merge branch 'main' into feature/index-job/create
Browse files Browse the repository at this point in the history
  • Loading branch information
hlts2 authored Oct 31, 2023
2 parents 51550ba + 22fe741 commit b1c2e50
Show file tree
Hide file tree
Showing 7 changed files with 1,612 additions and 9 deletions.
2 changes: 1 addition & 1 deletion charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2676,7 +2676,7 @@ manager:
observability:
otlp:
attribute:
service_name: vald-manager-index
service_name: vald-index-correction
# @schema {"name": "manager.index.corrector.enabled", "type": "boolean"}
# manager.index.corrector.enabled -- enable index correction CronJob
enabled: false
Expand Down
39 changes: 39 additions & 0 deletions docs/user-guides/index-correction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Index Correction

In the Vald cluster, the same Index is replicated to multiple agents due to the `index_replica` setting. However, inconsistencies between replicas may occur due to pod eviction or the occurrence of OOM killer during vector insertions. For example,

1. The timestamp of the index differs between agents (some agents have an old index saved and it has not been updated).
2. The number of replicas does not meet the value set in `index_replica`.

To resolve these inconsistencies, you can use the `Index Correction` feature.

`Index Correction` is implemented as a [`CronJob`](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/), checking the consistency between replicas regularly and resolving any inconsistencies.

## Settings

- enabled
Turns the index correction feature on/off.
- schedule
Sets the interval for the job start in cron notation (the default value is `3 6 * * *`, which means 3:06 AM every day).
- suspend
[Temporary suspension setting](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#schedule-suspension) for CronJob.

```yaml
manager:
index:
corrector:
enabled: true
schedule: "3 6 * * *"
suspend: false
```
## Important Notes
- Processing time
Under conditions of 10 million identical vectors(not including `index_replica`) and 10 agent replicas, the processing takes about 30~40 minutes (this is only a reference, and the actual execution time may vary depending on the infrastructure). Time complexity of the process is `O(MN)` where M is the number of identical vector items and N is the number of agent replicas. `index_replica` does not matter for the processing time.

- concurrencyPolicy
`Forbid` is set internally, so a new job will not be created while an existing job is running. In other words, if the process does not finish within the interval specified by the schedule, the next job will not be scheduled.

- Index operations during correction
Vector operations performed after the start of the index correction job are not considered in that job.
121 changes: 121 additions & 0 deletions internal/observability/metrics/index/job/correction/correction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package correction

import (
"context"

"github.com/vdaas/vald/internal/observability/metrics"
"github.com/vdaas/vald/pkg/index/job/correction/service"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/view"
)

const (
checkedIndexCount = "index_job_correction_checked_index_count"
checkedIndexCountDesc = "The number of checked indexes while index correction job"

correctedOldIndexCount = "index_job_correction_corrected_old_index_count"
correctedOldIndexCountDesc = "The number of corrected old indexes while index correction job"

correctedReplicationCount = "index_job_correction_corrected_replication_count"
correctedReplicationCountDesc = "The number of operation happened to correct replication number while index correction job"
)

type correctionMetrics struct {
correction service.Corrector
}

func New(c service.Corrector) metrics.Metric {
return &correctionMetrics{
correction: c,
}
}

func (*correctionMetrics) View() ([]*metrics.View, error) {
checkedIndexCount, err := view.New(
view.MatchInstrumentName(checkedIndexCount),
view.WithSetDescription(checkedIndexCountDesc),
view.WithSetAggregation(aggregation.LastValue{}),
)
if err != nil {
return nil, err
}

oldIndexCount, err := view.New(
view.MatchInstrumentName(correctedOldIndexCount),
view.WithSetDescription(correctedOldIndexCountDesc),
view.WithSetAggregation(aggregation.LastValue{}),
)
if err != nil {
return nil, err
}

replicationCount, err := view.New(
view.MatchInstrumentName(correctedReplicationCount),
view.WithSetDescription(correctedReplicationCountDesc),
view.WithSetAggregation(aggregation.LastValue{}),
)
if err != nil {
return nil, err
}

return []*metrics.View{
&checkedIndexCount,
&oldIndexCount,
&replicationCount,
}, nil
}

func (c *correctionMetrics) Register(m metrics.Meter) error {
checkedIndexCount, err := m.AsyncInt64().Gauge(
checkedIndexCount,
metrics.WithDescription(checkedIndexCountDesc),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}

oldIndexCount, err := m.AsyncInt64().Gauge(
correctedOldIndexCount,
metrics.WithDescription(correctedOldIndexCountDesc),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}

replicationCount, err := m.AsyncInt64().Gauge(
correctedReplicationCount,
metrics.WithDescription(correctedReplicationCountDesc),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}

return m.RegisterCallback(
[]metrics.AsynchronousInstrument{
checkedIndexCount,
oldIndexCount,
replicationCount,
},
func(ctx context.Context) {
checkedIndexCount.Observe(ctx, int64(c.correction.NumberOfCheckedIndex()))
oldIndexCount.Observe(ctx, int64(c.correction.NumberOfCorrectedOldIndex()))
replicationCount.Observe(ctx, int64(c.correction.NumberOfCorrectedReplication()))
},
)
}
Loading

0 comments on commit b1c2e50

Please sign in to comment.