Skip to content
Snippets Groups Projects
Commit 2b533761 authored by Alexander Aghili's avatar Alexander Aghili
Browse files

fix shuffle testsw

parent f916d9b7
No related branches found
No related tags found
No related merge requests found
...@@ -4,6 +4,102 @@ from ...utils.util import Logger ...@@ -4,6 +4,102 @@ from ...utils.util import Logger
from ..helper import KVSMultiClient, KVSTestFixture from ..helper import KVSMultiClient, KVSTestFixture
from ...utils.kvs_api import DEFAULT_TIMEOUT from ...utils.kvs_api import DEFAULT_TIMEOUT
def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger):
with KVSTestFixture(conductor, dir, log, node_count=3) as fx:
c = KVSMultiClient(fx.clients, "client", log)
conductor.add_shard("shard1", conductor.get_nodes([0]))
conductor.add_shard("shard2", conductor.get_nodes([1]))
fx.broadcast_view(conductor.get_shard_view())
node_to_put = 0
base_key = "key"
# Put 15 keys
for i in range(15):
log(f"Putting key {i}\n")
r = c.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10)
assert r.ok, f"expected ok for new key, got {r.status_code}"
node_to_put += 1
node_to_put = node_to_put % 2
# Get all keys
r = c.get_all(0, timeout=10)
assert r.ok, f"expected ok for get, got {r.status_code}"
res = r.json()["items"]
shard1_keys = res
r = c.get_all(1, timeout=10)
assert r.ok, f"expected ok for get, got {r.status_code}"
res = r.json()["items"]
shard2_keys = res
log(f"Shard 1 keys: {shard1_keys}\n")
log(f"Shard 2 keys: {shard2_keys}\n")
# Total number of keys should matched number of keys put
assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}"
# Add a 3rd shard, causing a shuffle. There should still be 15 keys at the end.
log("Adding 3rd shard\n")
conductor.add_shard("shard3", conductor.get_nodes([2]))
fx.broadcast_view(conductor.get_shard_view())
# Get the keys on shard 1
r = c.get_all(0, timeout=10)
assert r.ok, f"expected ok for get, got {r.status_code}"
res = r.json()["items"]
shard1_keys = res
log(f"Shard 1 keys: {shard1_keys}\n")
# get the keys on shard 2
r = c.get_all(1, timeout=10)
assert r.ok, f"expected ok for get, got {r.status_code}"
res = r.json()["items"]
shard2_keys = res
log(f"Shard 2 keys: {shard2_keys}\n")
# get the keys on shard 3
r = c.get_all(2, timeout=10)
assert r.ok, f"expected ok for get, got {r.status_code}"
res = r.json()["items"]
shard3_keys = res
log(f"Shard 3 keys: {shard3_keys}\n")
assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}"
# Remove shard 3, causing a shuffle. Move Node 2 to shard 1 so the keys should still exist, and be shuffled
conductor.remove_shard("shard3")
fx.broadcast_view(conductor.get_shard_view())
r = c.get_all(0, timeout=10) # should get all of shard 1's keys
assert r.ok, f"expected ok for get, got {r.status_code}"
res = r.json()["items"]
shard1_keys = res
r = c.get_all(1, timeout=10) # should get all of shard 2's keys
assert r.ok, f"expected ok for get, got {r.status_code}"
res = r.json()["items"]
shard2_keys = res
assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}"
# Remove shard 2. This loses keys.
conductor.remove_shard("shard2")
fx.broadcast_view(conductor.get_shard_view())
r = c.get_all(0, timeout=10)
assert r.ok, f"expected ok for get, got {r.status_code}"
res = r.json()["items"]
shard1_keys_after_delete = res
assert len(shard1_keys_after_delete) == 15, f"expected 15 keys, got {len(shard1_keys_after_delete)}"
return True, "ok"
def basic_shuffle(conductor: ClusterConductor, dir, log: Logger): def basic_shuffle(conductor: ClusterConductor, dir, log: Logger):
with KVSTestFixture(conductor, dir, log, node_count=3) as fx: with KVSTestFixture(conductor, dir, log, node_count=3) as fx:
c = KVSMultiClient(fx.clients, "client", log) c = KVSMultiClient(fx.clients, "client", log)
...@@ -89,6 +185,7 @@ def basic_shuffle(conductor: ClusterConductor, dir, log: Logger): ...@@ -89,6 +185,7 @@ def basic_shuffle(conductor: ClusterConductor, dir, log: Logger):
# Remove shard 2. This loses keys. # Remove shard 2. This loses keys.
conductor.remove_shard("shard2") conductor.remove_shard("shard2")
conductor.remove_node_from_shard("shard1", conductor.get_node(2))
fx.broadcast_view(conductor.get_shard_view()) fx.broadcast_view(conductor.get_shard_view())
r = c.get_all(0, timeout=10) r = c.get_all(0, timeout=10)
...@@ -96,7 +193,7 @@ def basic_shuffle(conductor: ClusterConductor, dir, log: Logger): ...@@ -96,7 +193,7 @@ def basic_shuffle(conductor: ClusterConductor, dir, log: Logger):
res = r.json()["items"] res = r.json()["items"]
shard1_keys_after_delete = res shard1_keys_after_delete = res
assert len(shard1_keys) == len(shard1_keys_after_delete), f"expected {len(shard1_keys)} keys, got {len(shard1_keys_after_delete)}" assert len(shard1_keys_after_delete) == 15, f"expected 15 keys, got {len(shard1_keys_after_delete)}"
return True, "ok" return True, "ok"
...@@ -151,4 +248,4 @@ def basic_shuffle_2(conductor: ClusterConductor, dir, log: Logger): ...@@ -151,4 +248,4 @@ def basic_shuffle_2(conductor: ClusterConductor, dir, log: Logger):
return True, "ok" return True, "ok"
SHUFFLE_TESTS = [TestCase("basic_shuffle", basic_shuffle), TestCase("basic_shuffle_2", basic_shuffle_2)] SHUFFLE_TESTS = [TestCase("basic_shuffle_add_remove", basic_shuffle_add_remove), TestCase("basic_shuffle", basic_shuffle), TestCase("basic_shuffle_2", basic_shuffle_2)]
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment