From 90350c7d19ab9a0365ee8ba6b453e289d32a8253 Mon Sep 17 00:00:00 2001 From: zphrs <z@zephiris.dev> Date: Fri, 14 Mar 2025 18:28:45 -0700 Subject: [PATCH] made shuffle not persist metadata --- tests/shuffle/basic_shuffle.py | 124 ++++++++++++++++++++++----------- 1 file changed, 85 insertions(+), 39 deletions(-) diff --git a/tests/shuffle/basic_shuffle.py b/tests/shuffle/basic_shuffle.py index 16f7acc..93e86ba 100644 --- a/tests/shuffle/basic_shuffle.py +++ b/tests/shuffle/basic_shuffle.py @@ -4,12 +4,13 @@ from ...utils.util import Logger from ..helper import KVSMultiClient, KVSTestFixture from ...utils.kvs_api import DEFAULT_TIMEOUT + def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): with KVSTestFixture(conductor, dir, log, node_count=3) as fx: - c = KVSMultiClient(fx.clients, "client", log) + c = KVSMultiClient(fx.clients, "client", log, persist_metadata=False) conductor.add_shard("shard1", conductor.get_nodes([0])) conductor.add_shard("shard2", conductor.get_nodes([1])) - + fx.broadcast_view(conductor.get_shard_view()) node_to_put = 0 @@ -17,27 +18,36 @@ def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): # Put 15 keys for i in range(15): log(f"Putting key {i}\n") + c.reset_model() r = c.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) assert r.ok, f"expected ok for new key, got {r.status_code}" node_to_put += 1 node_to_put = node_to_put % 2 + c.reset_model() # Get all keys r = c.get_all(0, timeout=10) + c.reset_model() assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard1_keys = res + c.reset_model() + r = c.get_all(1, timeout=10) assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard2_keys = res + c.reset_model() + log(f"Shard 1 keys: {shard1_keys}\n") log(f"Shard 2 keys: {shard2_keys}\n") # Total number of keys should matched number of keys put - assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + assert len(shard1_keys) + len(shard2_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + ) # Add a 3rd shard, causing a shuffle. There should still be 15 keys at the end. log("Adding 3rd shard\n") @@ -49,6 +59,7 @@ def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard1_keys = res + c.reset_model() log(f"Shard 1 keys: {shard1_keys}\n") @@ -57,6 +68,7 @@ def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard2_keys = res + c.reset_model() log(f"Shard 2 keys: {shard2_keys}\n") @@ -65,26 +77,36 @@ def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard3_keys = res - + + c.reset_model() + log(f"Shard 3 keys: {shard3_keys}\n") - assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + ) # Remove shard 3, causing a shuffle. Move Node 2 to shard 1 so the keys should still exist, and be shuffled conductor.remove_shard("shard3") fx.broadcast_view(conductor.get_shard_view()) - r = c.get_all(0, timeout=10) # should get all of shard 1's keys + r = c.get_all(0, timeout=10) # should get all of shard 1's keys assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard1_keys = res - r = c.get_all(1, timeout=10) # should get all of shard 2's keys + c.reset_model() + + r = c.get_all(1, timeout=10) # should get all of shard 2's keys assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard2_keys = res - assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + c.reset_model() + + assert len(shard1_keys) + len(shard2_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + ) # Remove shard 2. This loses keys. conductor.remove_shard("shard2") @@ -94,18 +116,22 @@ def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard1_keys_after_delete = res - - assert len(shard1_keys_after_delete) == 15, f"expected 15 keys, got {len(shard1_keys_after_delete)}" + c.reset_model() + + assert len(shard1_keys_after_delete) == 15, ( + f"expected 15 keys, got {len(shard1_keys_after_delete)}" + ) return True, "ok" - + + def basic_shuffle_1(conductor: ClusterConductor, dir, log: Logger): with KVSTestFixture(conductor, dir, log, node_count=3) as fx: - c = KVSMultiClient(fx.clients, "client", log) + c = KVSMultiClient(fx.clients, "client", log, persist_metadata=False) conductor.add_shard("shard1", conductor.get_nodes([0])) conductor.add_shard("shard2", conductor.get_nodes([1])) - + fx.broadcast_view(conductor.get_shard_view()) node_to_put = 0 @@ -133,7 +159,9 @@ def basic_shuffle_1(conductor: ClusterConductor, dir, log: Logger): log(f"Shard 2 keys: {shard2_keys}\n") # Total number of keys should matched number of keys put - assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + assert len(shard1_keys) + len(shard2_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + ) # Add a 3rd shard, causing a shuffle. There should still be 15 keys at the end. log("Adding 3rd shard\n") @@ -161,27 +189,31 @@ def basic_shuffle_1(conductor: ClusterConductor, dir, log: Logger): assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard3_keys = res - + log(f"Shard 3 keys: {shard3_keys}\n") - assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + ) # Remove shard 3, causing a shuffle. Move Node 2 to shard 1 so the keys should still exist, and be shuffled conductor.remove_shard("shard3") conductor.add_node_to_shard("shard1", conductor.get_node(2)) fx.broadcast_view(conductor.get_shard_view()) - r = c.get_all(0, timeout=10) # should get all of shard 1's keys + r = c.get_all(0, timeout=10) # should get all of shard 1's keys assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard1_keys = res - r = c.get_all(1, timeout=10) # should get all of shard 2's keys + r = c.get_all(1, timeout=10) # should get all of shard 2's keys assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard2_keys = res - assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + assert len(shard1_keys) + len(shard2_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + ) # Remove shard 2. This loses keys. conductor.remove_shard("shard2") @@ -192,18 +224,20 @@ def basic_shuffle_1(conductor: ClusterConductor, dir, log: Logger): assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard1_keys_after_delete = res - - assert len(shard1_keys_after_delete) == 15, f"expected 15 keys, got {len(shard1_keys_after_delete)}" + assert len(shard1_keys_after_delete) == 15, ( + f"expected 15 keys, got {len(shard1_keys_after_delete)}" + ) return True, "ok" - + + def basic_shuffle_2(conductor: ClusterConductor, dir, log: Logger): with KVSTestFixture(conductor, dir, log, node_count=3) as fx: - c = KVSMultiClient(fx.clients, "client", log) + c = KVSMultiClient(fx.clients, "client", log, persist_metadata=False) conductor.add_shard("shard1", conductor.get_nodes([0, 1])) fx.broadcast_view(conductor.get_shard_view()) - + ## basic shuffle 2 # view= 1 shard with 2 nodes # put 50 keys @@ -221,19 +255,19 @@ def basic_shuffle_2(conductor: ClusterConductor, dir, log: Logger): node_to_put += 1 node_to_put = node_to_put % 3 - r = c.get_all(0, timeout=10) # should get all of shard 1's keys + r = c.get_all(0, timeout=10) # should get all of shard 1's keys assert r.ok, f"expected ok for get, got {r.status_code}" original_get_all = r.json()["items"] conductor.add_shard("shard2", conductor.get_nodes([2])) fx.broadcast_view(conductor.get_shard_view()) - r = c.get_all(2, timeout=10) # should get all of shard 2's keys + r = c.get_all(2, timeout=10) # should get all of shard 2's keys assert r.ok, f"expected ok for get, got {r.status_code}" get_all_1 = r.json()["items"] keys1 = get_all_1.keys() - r = c.get_all(1, timeout=10) # should get all of shard 1's keys + r = c.get_all(1, timeout=10) # should get all of shard 1's keys assert r.ok, f"expected ok for get, got {r.status_code}" get_all_2 = r.json()["items"] keys2 = get_all_2.keys() @@ -242,7 +276,7 @@ def basic_shuffle_2(conductor: ClusterConductor, dir, log: Logger): assert not (key in keys2) for key in keys2: assert not (key in keys1) - + assert original_get_all.keys() == keys1 | keys2 assert len(original_get_all) == len(keys1) + len(keys2) @@ -251,10 +285,10 @@ def basic_shuffle_2(conductor: ClusterConductor, dir, log: Logger): def basic_shuffle_3(conductor: ClusterConductor, dir, log: Logger): with KVSTestFixture(conductor, dir, log, node_count=3) as fx: - c = KVSMultiClient(fx.clients, "client", log) + c = KVSMultiClient(fx.clients, "client", log, persist_metadata=False) conductor.add_shard("shard1", conductor.get_nodes([0])) conductor.add_shard("shard2", conductor.get_nodes([1])) - + fx.broadcast_view(conductor.get_shard_view()) node_to_put = 0 @@ -282,7 +316,9 @@ def basic_shuffle_3(conductor: ClusterConductor, dir, log: Logger): log(f"Shard 2 keys: {shard2_keys}\n") # Total number of keys should matched number of keys put - assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + assert len(shard1_keys) + len(shard2_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + ) # Add a 3rd shard, causing a shuffle. There should still be 15 keys at the end. log("Adding 3rd shard\n") @@ -310,27 +346,31 @@ def basic_shuffle_3(conductor: ClusterConductor, dir, log: Logger): assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard3_keys = res - + log(f"Shard 3 keys: {shard3_keys}\n") - assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + ) # Remove shard 3, causing a shuffle. Move Node 2 to shard 1 so the keys should still exist, and be shuffled conductor.remove_shard("shard3") conductor.add_node_to_shard("shard1", conductor.get_node(2)) fx.broadcast_view(conductor.get_shard_view()) - r = c.get_all(0, timeout=10) # should get all of shard 1's keys + r = c.get_all(0, timeout=10) # should get all of shard 1's keys assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard1_keys = res - r = c.get_all(1, timeout=10) # should get all of shard 2's keys + r = c.get_all(1, timeout=10) # should get all of shard 2's keys assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard2_keys = res - assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + assert len(shard1_keys) + len(shard2_keys) == 15, ( + f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + ) # Remove shard 2. This loses keys. conductor.remove_shard("shard2") @@ -340,11 +380,17 @@ def basic_shuffle_3(conductor: ClusterConductor, dir, log: Logger): assert r.ok, f"expected ok for get, got {r.status_code}" res = r.json()["items"] shard1_keys_after_delete = res - - assert len(shard1_keys_after_delete) == 15, f"expected 15 keys, got {len(shard1_keys_after_delete)}" + assert len(shard1_keys_after_delete) == 15, ( + f"expected 15 keys, got {len(shard1_keys_after_delete)}" + ) return True, "ok" -SHUFFLE_TESTS = [TestCase("basic_shuffle_add_remove", basic_shuffle_add_remove), TestCase("basic_shuffle_1", basic_shuffle_1), TestCase("basic_shuffle_2", basic_shuffle_2), TestCase("basic_shuffle_3", basic_shuffle_3)] +SHUFFLE_TESTS = [ + TestCase("basic_shuffle_add_remove", basic_shuffle_add_remove), + TestCase("basic_shuffle_1", basic_shuffle_1), + TestCase("basic_shuffle_2", basic_shuffle_2), + TestCase("basic_shuffle_3", basic_shuffle_3), +] -- GitLab