Skip to content

Commit

Permalink
Extend ShuffleSharding on READONLY ingesters (#6517)
Browse files Browse the repository at this point in the history
* Filter readOnly ingesters when sharding

Signed-off-by: Daniel Deluiggi <[email protected]>

* Extend shard on READONLY

Signed-off-by: Daniel Deluiggi <[email protected]>

* Remove old code

Signed-off-by: Daniel Deluiggi <[email protected]>

* Fix test

Signed-off-by: Daniel Deluiggi <[email protected]>

* update changelog

Signed-off-by: Daniel Deluiggi <[email protected]>

---------

Signed-off-by: Daniel Deluiggi <[email protected]>
  • Loading branch information
danielblando authored Jan 30, 2025
1 parent 4b32f29 commit b48f93b
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517

## 1.19.0 in progress

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3122,7 +3122,7 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// ModeHandler Change mode of ingester. It will also update set unregisterOnShutdown to true if READONLY mode
// ModeHandler Change mode of ingester.
func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,12 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error

level.Info(i.logger).Log("msg", "changing instance state from", "old_state", currState, "new_state", state, "ring", i.RingName)
i.setState(state)

//The instances is rejoining the ring. It should reset its registered time.
if currState == READONLY && state == ACTIVE {
registeredAt := time.Now()
i.setRegisteredAt(registeredAt)
}
return i.updateConsul(ctx)
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,86 @@ func TestTokenFileOnDisk(t *testing.T) {
}
}

func TestRegisteredAtOnBackToActive(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

tokenDir := t.TempDir()

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.NumTokens = 512
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// Check this ingester joined, is active.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE
})

//Get original registeredTime
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
require.True(t, ok)
originalRegisterTime := desc.Ingesters["ing1"].RegisteredTimestamp

// Change state from ACTIVE to READONLY
err = l1.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == READONLY
})

//Guarantee 1s diff for RegisteredTimestamp
time.Sleep(1 * time.Second)

// Change state from READONLY to ACTIVE
err = l1.ChangeState(context.Background(), ACTIVE)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == ACTIVE
})

d, err = r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok = d.(*Desc)
require.True(t, ok)
ing := desc.Ingesters["ing1"]
require.True(t, ing.RegisteredTimestamp > originalRegisterTime)

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))
}

func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
19 changes: 14 additions & 5 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,11 @@ type CompareResult int
const (
Equal CompareResult = iota // Both rings contain same exact instances.
EqualButStatesAndTimestamps // Both rings contain the same instances with the same data except states and timestamps (may differ).
EqualButReadOnly // Both rings contain the same instances but Write ring can change due to ReadOnly update
Different // Rings have different set of instances, or their information don't match.
)

// RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps or Different.
// RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps, EqualButReadOnly or Different.
func (d *Desc) RingCompare(o *Desc) CompareResult {
if d == nil {
if o == nil || len(o.Ingesters) == 0 {
Expand All @@ -566,6 +567,7 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
}

equalStatesAndTimestamps := true
equalReadOnly := true

for name, ing := range d.Ingesters {
oing, ok := o.Ingesters[name]
Expand Down Expand Up @@ -600,14 +602,21 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
}

if ing.State != oing.State {
equalStatesAndTimestamps = false
if ing.State == READONLY || oing.State == READONLY {
equalReadOnly = false
} else {
equalStatesAndTimestamps = false
}
}
}

if equalStatesAndTimestamps {
return Equal
if !equalReadOnly {
return EqualButReadOnly
}
if !equalStatesAndTimestamps {
return EqualButStatesAndTimestamps
}
return EqualButStatesAndTimestamps
return Equal
}

func GetOrCreateRingDesc(d interface{}) *Desc {
Expand Down
20 changes: 20 additions & 0 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,21 @@ func TestDesc_RingsCompare(t *testing.T) {
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}},
expected: Equal,
},
"same number of instances, from active to readOnly": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}},
expected: EqualButReadOnly,
},
"same number of instances, from readOnly to active": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
expected: EqualButReadOnly,
},
"same number of instances, prioritize readOnly than timestamp changes": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, Timestamp: 123456}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY, Timestamp: 789012}}},
expected: EqualButReadOnly,
},
"same single instance, different timestamp": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 123456}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 789012}}},
Expand Down Expand Up @@ -440,6 +455,11 @@ func TestDesc_RingsCompare(t *testing.T) {
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}},
expected: Different,
},
"same number of instances, prioritize diff than ReadOnly": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "one", State: ACTIVE}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two", State: READONLY}}},
expected: Different,
},
}

for testName, testData := range tests {
Expand Down
10 changes: 8 additions & 2 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,16 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
}

rc := prevRing.RingCompare(ringDesc)
if rc == Equal || rc == EqualButStatesAndTimestamps {
if rc == Equal || rc == EqualButStatesAndTimestamps || rc == EqualButReadOnly {
// No need to update tokens or zones. Only states and timestamps
// have changed. (If Equal, nothing has changed, but that doesn't happen
// when watching the ring for updates).
r.mtx.Lock()
r.ringDesc = ringDesc
if rc == EqualButReadOnly && r.shuffledSubringCache != nil {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
}
r.updateRingMetrics(rc)
r.mtx.Unlock()
return
Expand Down Expand Up @@ -852,7 +856,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur

// If the lookback is enabled and this instance has been registered within the lookback period
// then we should include it in the subring but continuing selecting instances.
if lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil {
// If an instance is in READONLY we should always extend. The write path will filter it out when GetRing.
// The read path should extend to get new ingester used on write
if (lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil) || instance.State == READONLY {
continue
}

Expand Down
105 changes: 105 additions & 0 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2523,6 +2523,111 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) {
}
}

func TestRing_ShuffleShardWithReadOnlyIngesters(t *testing.T) {
g := NewRandomTokenGenerator()

const (
userID = "user-1"
)

tests := map[string]struct {
ringInstances map[string]InstanceDesc
ringReplicationFactor int
shardSize int
expectedSize int
op Operation
expectedToBePresent []string
}{
"single zone, shard size = 1, default scenario": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)},
},
ringReplicationFactor: 1,
shardSize: 1,
expectedSize: 1,
},
"single zone, shard size = 1, not filter ReadOnly": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)},
},
ringReplicationFactor: 1,
shardSize: 2,
expectedSize: 2,
},
"single zone, shard size = 4, do not filter other states": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: JOINING, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: LEAVING, Tokens: g.GenerateTokens(NewDesc(), "instance-3", "zone-a", 128, true)},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: PENDING, Tokens: g.GenerateTokens(NewDesc(), "instance-4", "zone-a", 128, true)},
},
ringReplicationFactor: 1,
shardSize: 4,
expectedSize: 4,
},
"single zone, shard size = 4, extend on readOnly": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{2}},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{4}},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{6}},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: READONLY, Tokens: []uint32{1, 3, 5}},
},
ringReplicationFactor: 1,
shardSize: 2,
expectedSize: 3,
expectedToBePresent: []string{"instance-4"},
},
"rf = 3, shard size = 4, extend readOnly from different zones": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{2}},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-b", State: ACTIVE, Tokens: []uint32{12}},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-c", State: ACTIVE, Tokens: []uint32{22}},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{4}},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-b", State: ACTIVE, Tokens: []uint32{14}},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Tokens: []uint32{24}},
"instance-7": {Addr: "127.0.0.7", Zone: "zone-a", State: READONLY, Tokens: []uint32{1, 3}},
"instance-8": {Addr: "127.0.0.8", Zone: "zone-b", State: READONLY, Tokens: []uint32{11, 13}},
"instance-9": {Addr: "127.0.0.9", Zone: "zone-c", State: READONLY, Tokens: []uint32{21, 23}},
},
ringReplicationFactor: 3,
shardSize: 6,
expectedSize: 9,
expectedToBePresent: []string{"instance-7", "instance-8", "instance-9"},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Init the ring.
ringDesc := &Desc{Ingesters: testData.ringInstances}
for id, instance := range ringDesc.Ingesters {
ringDesc.Ingesters[id] = instance
}

ring := Ring{
cfg: Config{
ReplicationFactor: testData.ringReplicationFactor,
},
ringDesc: ringDesc,
ringTokens: ringDesc.GetTokens(),
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
KVClient: &MockClient{},
}

shardRing := ring.ShuffleShard(userID, testData.shardSize)
assert.Equal(t, testData.expectedSize, shardRing.InstancesCount())
for _, expectedInstance := range testData.expectedToBePresent {
assert.True(t, shardRing.HasInstance(expectedInstance))
}
})
}
}

func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) {
// The goal of this test is NOT to ensure that the minimum required number of instances
// are returned at any given time, BUT at least all required instances are returned.
Expand Down

0 comments on commit b48f93b

Please sign in to comment.