From b37287bf14c2af8a152a8a29c1005596125be940 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 12 Dec 2024 13:19:14 +0100 Subject: [PATCH] chore: test metrics for huge value serialization (#4262) * fix seeder bugs * add test * add assertions for huge value metrics Signed-off-by: kostas --- tests/dragonfly/instance.py | 3 +- tests/dragonfly/replication_test.py | 48 +++++++++++++++--------- tests/dragonfly/seeder/__init__.py | 10 ++--- tests/dragonfly/seeder/script-genlib.lua | 26 +++++++------ 4 files changed, 51 insertions(+), 36 deletions(-) diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 3814d970c9ef..8feaeea0d97c 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -426,8 +426,7 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn args.setdefault("log_dir", self.params.log_dir) if version >= 1.21 and "serialization_max_chunk_size" not in args: - # Add 1 byte limit for big values - args.setdefault("serialization_max_chunk_size", 1) + args.setdefault("serialization_max_chunk_size", 16384) for k, v in args.items(): args[k] = v.format(**self.params.env) if isinstance(v, str) else v diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 6ff68fc390e9..aae2311b8cba 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -44,23 +44,21 @@ async def wait_for_replicas_state(*clients, state="online", node_role="slave", t @pytest.mark.parametrize( - "t_master, t_replicas, seeder_config, stream_target, big_value", + "t_master, t_replicas, seeder_config, stream_target", [ # Quick general test that replication is working - (1, 3 * [1], dict(key_target=1_000), 500, False), - (4, [4, 4], dict(key_target=10_000), 1_000, False), - pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, False, marks=M_OPT), + (1, 3 * [1], dict(key_target=1_000), 500), + # A lot of huge values + (2, 2 * [1], dict(key_target=1_000, huge_value_percentage=2), 500), + (4, [4, 4], dict(key_target=10_000), 1_000), + pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, marks=M_OPT), # Skewed tests with different thread ratio - pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, False, marks=M_SLOW), - pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, False, marks=M_SLOW), - # Test with big value size - pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, False, marks=M_SLOW), - # Test with big value and big value serialization - pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, True, marks=M_SLOW), + pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, marks=M_SLOW), + pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, marks=M_SLOW), + # Everything is big because data size is 10k + pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, marks=M_SLOW), # Stress test - pytest.param( - 8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, False, marks=M_STRESS - ), + pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, marks=M_STRESS), ], ) @pytest.mark.parametrize("mode", [({}), ({"cache_mode": "true"})]) @@ -70,7 +68,6 @@ async def test_replication_all( t_replicas, seeder_config, stream_target, - big_value, mode, ): args = {} @@ -78,9 +75,6 @@ async def test_replication_all( args["cache_mode"] = "true" args["maxmemory"] = str(t_master * 256) + "mb" - if big_value: - args["serialization_max_chunk_size"] = 4096 - master = df_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **args) replicas = [ df_factory.create(admin_port=ADMIN_PORT + i + 1, proactor_threads=t) @@ -132,6 +126,26 @@ async def check(): # Check data after stable state stream await check() + info = await c_master.info() + preemptions = info["big_value_preemptions"] + key_target = seeder_config["key_target"] + # Rough estimate + estimated_preemptions = key_target * (0.01) + assert preemptions > estimated_preemptions + + # Because data size could be 10k and for that case there will be almost a preemption + # per bucket. + if "data_size" not in seeder_config.keys(): + total_buckets = info["num_buckets"] + # We care that we preempt less times than the total buckets such that we can be + # sure that we test both flows (with and without preemptions). Preemptions on 30% + # of buckets seems like a big number but that depends on a few parameters like + # the size of the hug value and the serialization max chunk size. For the test cases here, + # it's usually close to 10% but there are some that are close to 30. + total_buckets = info["num_buckets"] + logging.debug(f"Buckets {total_buckets}. Preemptions {preemptions}") + assert preemptions <= (total_buckets * 0.3) + async def check_replica_finished_exec(c_replica: aioredis.Redis, m_offset): role = await c_replica.role() diff --git a/tests/dragonfly/seeder/__init__.py b/tests/dragonfly/seeder/__init__.py index 01aefdb7e880..3c55dbe28b04 100644 --- a/tests/dragonfly/seeder/__init__.py +++ b/tests/dragonfly/seeder/__init__.py @@ -138,10 +138,10 @@ def __init__( data_size=100, collection_size=None, types: typing.Optional[typing.List[str]] = None, - huge_value_percentage=1, - huge_value_size=1024, - # 1 huge entries per container/key as default - huge_value_csize=1, + huge_value_percentage=0, + huge_value_size=10000, + # 2 huge entries per container/key as default + huge_value_csize=2, ): SeederBase.__init__(self, types) self.key_target = key_target @@ -216,6 +216,6 @@ async def _run_unit(client: aioredis.Redis, sha: str, unit: Unit, using_stopkey, msg = f"running unit {unit.prefix}/{unit.type} took {time.time() - s}, target {args[4+0]}" if huge_keys > 0: - msg = f"{msg}. Total huge keys added {huge_keys} with {args[11]} elements each. Total extra modified huge entries {huge_entries}." + msg = f"{msg}. Total huge keys added {huge_keys} with {args[11]} elements each. Total huge entries {huge_entries}." logging.debug(msg) diff --git a/tests/dragonfly/seeder/script-genlib.lua b/tests/dragonfly/seeder/script-genlib.lua index fb497c17fc98..45d672a0a199 100644 --- a/tests/dragonfly/seeder/script-genlib.lua +++ b/tests/dragonfly/seeder/script-genlib.lua @@ -56,22 +56,23 @@ end function LG_funcs.add_list(key, keys) local is_huge = keys[key] - redis.apcall('LPUSH', key, unpack(randstr_sequence(is_huge))) + --- TODO -- investigate why second case of replication_test_all fails + --- we somehow create a quicklist that is circular and we deadlock + redis.apcall('LPUSH', key, unpack(randstr_sequence(false))) end function LG_funcs.mod_list(key, keys) -- equally likely pops and pushes, we rely on the list size being large enough -- to "highly likely" not get emptied out by consequitve pops - local is_huge = keys[key] local action = math.random(1, 4) if action == 1 then redis.apcall('RPOP', key) elseif action == 2 then redis.apcall('LPOP', key) elseif action == 3 then - redis.apcall('LPUSH', key, randstr(is_huge)) + redis.apcall('LPUSH', key, randstr(false)) else - redis.apcall('RPUSH', key, randstr(is_huge)) + redis.apcall('RPUSH', key, randstr(false)) end end @@ -101,7 +102,7 @@ function LG_funcs.mod_set(key, keys) redis.apcall('SPOP', key) else local is_huge = keys[key] - redis.apcall('SADD', key, randstr(is_huge)) + redis.apcall('SADD', key, randstr(false)) end end @@ -113,19 +114,21 @@ end function LG_funcs.add_hash(key, keys) local blobs local is_huge = keys[key] + local limit = LG_funcs.csize if is_huge then - blobs = dragonfly.randstr(LG_funcs.huge_value_size, LG_funcs.csize / 2) + limit = LG_funcs.huge_value_csize + blobs = dragonfly.randstr(LG_funcs.huge_value_size, limit) huge_entries = huge_entries + 1 else blobs = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize / 2) end local htable = {} - for i = 1, LG_funcs.csize, 2 do + for i = 1, limit, 2 do htable[i * 2 - 1] = tostring(i) htable[i * 2] = math.random(0, 1000) end - for i = 2, LG_funcs.csize, 2 do + for i = 2, limit, 2 do htable[i * 2 - 1] = tostring(i) htable[i * 2] = blobs[i // 2] end @@ -137,8 +140,7 @@ function LG_funcs.mod_hash(key, keys) if idx % 2 == 1 then redis.apcall('HINCRBY', key, tostring(idx), 1) else - local is_huge = keys[key] - redis.apcall('HSET', key, tostring(idx), randstr(is_huge)) + redis.apcall('HSET', key, tostring(idx), randstr(false)) end end @@ -166,8 +168,8 @@ end function LG_funcs.mod_zset(key, keys) local action = math.random(1, 4) if action <= 2 then - local is_huge = keys[key] - redis.apcall('ZADD', key, math.random(0, LG_funcs.csize * 2), randstr(is_huge)) + local size = LG_funcs.csize * 2 + redis.apcall('ZADD', key, math.random(0, size), randstr(false)) elseif action == 3 then redis.apcall('ZPOPMAX', key) else