diff --git a/__main__.py b/__main__.py index 4c7f1d8c585ff1267ecd761bbd44072f7b2eb400..b26e498a373d0b61fe8f97013cc05e29838b91fe 100755 --- a/__main__.py +++ b/__main__.py @@ -79,6 +79,9 @@ from .tests.basic.basic import BASIC_TESTS from .tests.asgn3.availability.availability_basic import AVAILABILITY_TESTS from .tests.asgn3.causal_consistency.causal_basic import CAUSAL_TESTS from .tests.asgn3.eventual_consistency.convergence_basic import CONVERGENCE_TESTS +# from .tests.asgn3.view_change.view_change_basic import VIEW_CHANGE_TESTS +from .tests.proxy.basic_proxy import PROXY_TESTS +from .tests.shuffle.basic_shuffle import SHUFFLE_TESTS TEST_SET = [] TEST_SET.append(TestCase("hello_cluster", hello_cluster)) @@ -86,6 +89,9 @@ TEST_SET.extend(BASIC_TESTS) TEST_SET.extend(AVAILABILITY_TESTS) TEST_SET.extend(CAUSAL_TESTS) TEST_SET.extend(CONVERGENCE_TESTS) +TEST_SET.extend(PROXY_TESTS) +TEST_SET.extend(SHUFFLE_TESTS) +# TEST_SET.extend(VIEW_CHANGE_TESTS) # set to True to stop at the first failing test FAIL_FAST = True diff --git a/tests/basic/basic.py b/tests/basic/basic.py index 627cd4b8fcf97e1332c7466beecbc474a129fca7..94ceb01e9357c454340b86cdd00b2343dc092f37 100644 --- a/tests/basic/basic.py +++ b/tests/basic/basic.py @@ -66,5 +66,70 @@ def basic_kv_1(conductor: ClusterConductor, dir, log: Logger): return True, 0 +def basic_kv_verify_proxy(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=4) as fx: + c1 = KVSMultiClient(fx.clients, "c1", log) + c2 = KVSMultiClient(fx.clients, "c2", log) + c3 = KVSMultiClient(fx.clients, "c3", log) + c4 = KVSMultiClient(fx.clients, "c4", log) + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + conductor.add_shard("shard2", conductor.get_nodes([2, 3])) + fx.broadcast_view(conductor.get_shard_view()) + + keys = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"] + values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + + for i in range(len(keys)): + key = keys[i] + value = str(values[i]) + r = c1.put(0, key, value) + assert r.ok, f"expected ok for new key, got {r.status_code}" + + conductor.create_partition([0,1], "p0") + conductor.create_partition([2,3], "p1") + + r = c2.get_all(0) + assert r.ok, f"expected ok for new key, got {r.status_code}" + shard1_keys = r.json()["items"] + + r = c3.get_all(2) + assert r.ok, f"expected ok for new key, got {r.status_code}" + shard2_keys = r.json()["items"] + + print(shard1_keys) + print(shard2_keys) + assert ((len(shard1_keys) > 0) and (len(shard2_keys) > 0)), "One of the shards has no keys, this is extremely unlikely (1/2^11) and probably means something is wrong" + + rk1 = list(shard1_keys.keys())[0] + rk2 = list(shard2_keys.keys())[0] + + r = c4.put(0, rk2, "This should fail") + assert r.status_code == 408, f"expected 408 for new key, got {r.status_code}" + + r = c4.put(2, rk1, "This should also fail") + assert r.status_code == 408, f"expected 408 for new key, got {r.status_code}" + + conductor.create_partition([0, 1, 2, 3], "base") + + r = c4.put(0, rk2, "This should work") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = c4.put(2, rk1, "This should also work") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = c2.get_all(0) + assert r.ok, f"expected ok for new key, got {r.status_code}" + shard1_keys = r.json()["items"] + + r = c3.get_all(2) + assert r.ok, f"expected ok for new key, got {r.status_code}" + shard2_keys = r.json()["items"] + + print(shard1_keys) + print(shard2_keys) + assert (len(shard1_keys) > 0 and len(shard2_keys) > 0), "One of the shards has no keys, this is extremely unlikely (1/2^11) and probably means something is wrong" + + return True, 0 + -BASIC_TESTS = [TestCase("basic_kv_1", basic_kv_1), TestCase("basic_kv_view_accept", basic_kv_view_accept)] +BASIC_TESTS = [TestCase("basic_kv_1", basic_kv_1), TestCase("basic_kv_view_accept", basic_kv_view_accept), TestCase("basic_kv_verify_proxy", basic_kv_verify_proxy)] diff --git a/tests/helper.py b/tests/helper.py index e6d3e6d549b2937f73ae6f5cc27801b9194fb150..6840234df42772e22722dfca42dc41b1c5b5d833 100644 --- a/tests/helper.py +++ b/tests/helper.py @@ -15,6 +15,7 @@ class KVSTestFixture: self.clients: list[KVSClient] = [] self.log = log + def spawn_cluster(self): self.log("\n> SPAWN CLUSTER") self.conductor.spawn_cluster(node_count=self.node_count) @@ -36,6 +37,14 @@ class KVSTestFixture: ) self.log(f"view sent to node {i}: {r.status_code} {r.text}") + def rebroadcast_view(self, new_view: Dict[str, List[Dict[str, Any]]]): + for i, client in enumerate(self.clients): + r = client.resend_last_view_with_ips_from_new_view(new_view) + assert ( + r.status_code == 200 + ), f"expected 200 to ack view, got {r.status_code}" + self.log(f"view resent to node {i}: {r.status_code} {r.text}") + def send_view(self, node_id: int, view: Dict[str, List[Dict[str, Any]]]): r = self.clients[node_id].send_view(view) assert r.status_code == 200, f"expected 200 to ack view, got {r.status_code}" diff --git a/tests/proxy/basic_proxy.py b/tests/proxy/basic_proxy.py new file mode 100644 index 0000000000000000000000000000000000000000..63661925a3bb66007864d00880401db62c8be009 --- /dev/null +++ b/tests/proxy/basic_proxy.py @@ -0,0 +1,96 @@ +from ...utils.containers import ClusterConductor +from ...utils.testcase import TestCase +from ...utils.util import Logger +from ..helper import KVSMultiClient, KVSTestFixture +from ...utils.kvs_api import DEFAULT_TIMEOUT + +def basic_proxy_one_client(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=4) as fx: + c = KVSMultiClient(fx.clients, "client", log) + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + conductor.add_shard("shard2", conductor.get_nodes([2, 3])) + fx.broadcast_view(conductor.get_shard_view()) + # test 1 + # put 50 keys (at least one proxy expected here) + # get_all() on one shard + # then ask the other shard for that key (proxy MUST happen here) + + node_to_put = 0 + base_key = "key" + for i in range(0, 300): + r = c.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) + assert r.ok, f"expected ok for new key, got {r.status_code}" + node_to_put += 1 + node_to_put = node_to_put % 4 + + r = c.get_all(0, timeout=None) # should get all of shard 1's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + items = r.json()["items"] + keys = items.keys() + for key in keys: + r = c.get(2, key, timeout=30) + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == items[key], f"wrong value returned: {r.json()}" + + return True, "ok" + + +def basic_proxy_many_clients(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=7) as fx: + c = KVSMultiClient(fx.clients, "client", log) + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + conductor.add_shard("shard2", conductor.get_nodes([2, 3])) + conductor.add_shard("shard3", conductor.get_nodes([4, 5, 6])) + fx.broadcast_view(conductor.get_shard_view()) + # test 1 + # put 50 keys (at least one proxy expected here) + # get_all() on one shard + # then ask the other shard for that key (proxy MUST happen here) + + node_to_put = 0 + base_key = "key" + for i in range(0, 1000): + c1 = KVSMultiClient(fx.clients, "client", log) + r = c1.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) + assert r.ok, f"expected ok for new key, got {r.status_code}" + node_to_put += 1 + node_to_put = node_to_put % 7 + + r = c.get_all(0, timeout=None) # should get all of shard 1's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + items = r.json()["items"] + keys = items.keys() + for key in keys: + r = c.get(2, key, timeout=30) + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == items[key], f"wrong value returned: {r.json()}" + + return True, "ok" + +def basic_proxy_partitioned_shards(conductor: ClusterConductor, dir, log: Logger, timeout= 5*DEFAULT_TIMEOUT): + with KVSTestFixture(conductor, dir, log, node_count=4) as fx: + c = KVSMultiClient(fx.clients, "client", log) + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + conductor.add_shard("shard2", conductor.get_nodes([2, 3])) + conductor.create_partition([2,3], "secondshard") + fx.broadcast_view(conductor.get_shard_view()) + + helper(c, timeout=timeout) + return True, "ok" + +def helper(c: KVSMultiClient, timeout= 5*DEFAULT_TIMEOUT): + ### + # test 2 + # partition the shards + # put a bunch of keys + # we MUST probablistically encounter some hanging there. + # have a time out where if it doesnt hang after like 50 keys, then its just wrong. + node_to_put = 0 + base_key = "key" + for i in range(0, 50): + r = c.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) + assert r.ok or r.status_code == 408, f"expected ok for new key, got {r.status_code}" + node_to_put += 1 + node_to_put = node_to_put % 4 + +PROXY_TESTS = [TestCase("basic_proxy_one_client", basic_proxy_one_client), TestCase("basic_proxy_many_clients", basic_proxy_many_clients), TestCase("basic_proxy_partitioned_shards", basic_proxy_partitioned_shards)] diff --git a/tests/shuffle/basic_shuffle.py b/tests/shuffle/basic_shuffle.py new file mode 100644 index 0000000000000000000000000000000000000000..de404cc917eebd94002971e80e9e3df5cb160cb0 --- /dev/null +++ b/tests/shuffle/basic_shuffle.py @@ -0,0 +1,350 @@ +from ...utils.containers import ClusterConductor +from ...utils.testcase import TestCase +from ...utils.util import Logger +from ..helper import KVSMultiClient, KVSTestFixture +from ...utils.kvs_api import DEFAULT_TIMEOUT + +def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + c = KVSMultiClient(fx.clients, "client", log) + conductor.add_shard("shard1", conductor.get_nodes([0])) + conductor.add_shard("shard2", conductor.get_nodes([1])) + + fx.broadcast_view(conductor.get_shard_view()) + + node_to_put = 0 + base_key = "key" + # Put 15 keys + for i in range(15): + log(f"Putting key {i}\n") + r = c.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) + assert r.ok, f"expected ok for new key, got {r.status_code}" + node_to_put += 1 + node_to_put = node_to_put % 2 + + # Get all keys + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + r = c.get_all(1, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + log(f"Shard 1 keys: {shard1_keys}\n") + log(f"Shard 2 keys: {shard2_keys}\n") + + # Total number of keys should matched number of keys put + assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + + # Add a 3rd shard, causing a shuffle. There should still be 15 keys at the end. + log("Adding 3rd shard\n") + conductor.add_shard("shard3", conductor.get_nodes([2])) + fx.broadcast_view(conductor.get_shard_view()) + + # Get the keys on shard 1 + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + log(f"Shard 1 keys: {shard1_keys}\n") + + # get the keys on shard 2 + r = c.get_all(1, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + log(f"Shard 2 keys: {shard2_keys}\n") + + # get the keys on shard 3 + r = c.get_all(2, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard3_keys = res + + log(f"Shard 3 keys: {shard3_keys}\n") + + assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + + # Remove shard 3, causing a shuffle. Move Node 2 to shard 1 so the keys should still exist, and be shuffled + conductor.remove_shard("shard3") + fx.broadcast_view(conductor.get_shard_view()) + + r = c.get_all(0, timeout=10) # should get all of shard 1's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + r = c.get_all(1, timeout=10) # should get all of shard 2's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + + # Remove shard 2. This loses keys. + conductor.remove_shard("shard2") + fx.broadcast_view(conductor.get_shard_view()) + + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys_after_delete = res + + assert len(shard1_keys_after_delete) == 15, f"expected 15 keys, got {len(shard1_keys_after_delete)}" + + + return True, "ok" + +def basic_shuffle(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + c = KVSMultiClient(fx.clients, "client", log) + conductor.add_shard("shard1", conductor.get_nodes([0])) + conductor.add_shard("shard2", conductor.get_nodes([1])) + + fx.broadcast_view(conductor.get_shard_view()) + + node_to_put = 0 + base_key = "key" + # Put 15 keys + for i in range(15): + log(f"Putting key {i}\n") + r = c.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) + assert r.ok, f"expected ok for new key, got {r.status_code}" + node_to_put += 1 + node_to_put = node_to_put % 2 + + # Get all keys + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + r = c.get_all(1, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + log(f"Shard 1 keys: {shard1_keys}\n") + log(f"Shard 2 keys: {shard2_keys}\n") + + # Total number of keys should matched number of keys put + assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + + # Add a 3rd shard, causing a shuffle. There should still be 15 keys at the end. + log("Adding 3rd shard\n") + conductor.add_shard("shard3", conductor.get_nodes([2])) + fx.broadcast_view(conductor.get_shard_view()) + + # Get the keys on shard 1 + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + log(f"Shard 1 keys: {shard1_keys}\n") + + # get the keys on shard 2 + r = c.get_all(1, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + log(f"Shard 2 keys: {shard2_keys}\n") + + # get the keys on shard 3 + r = c.get_all(2, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard3_keys = res + + log(f"Shard 3 keys: {shard3_keys}\n") + + assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + + # Remove shard 3, causing a shuffle. Move Node 2 to shard 1 so the keys should still exist, and be shuffled + conductor.remove_shard("shard3") + conductor.add_node_to_shard("shard1", conductor.get_node(2)) + fx.broadcast_view(conductor.get_shard_view()) + + r = c.get_all(0, timeout=10) # should get all of shard 1's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + r = c.get_all(1, timeout=10) # should get all of shard 2's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + + # Remove shard 2. This loses keys. + conductor.remove_shard("shard2") + conductor.remove_node_from_shard("shard1", conductor.get_node(2)) + fx.broadcast_view(conductor.get_shard_view()) + + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys_after_delete = res + + assert len(shard1_keys_after_delete) == 15, f"expected 15 keys, got {len(shard1_keys_after_delete)}" + + + return True, "ok" + +def basic_shuffle_2(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + c = KVSMultiClient(fx.clients, "client", log) + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + + ## basic shuffle 2 + # view= 1 shard with 2 nodes + # put 50 keys + # get_all keys from shard 1 + # add shard with 1 node + # get_all keys from shard 2 + # get_all keys from shard 1 + # check both returned sets are disjoint and that their union makes the original get_all results + + node_to_put = 0 + base_key = "key" + for i in range(0, 300): + r = c.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) + assert r.ok, f"expected ok for new key, got {r.status_code}" + node_to_put += 1 + node_to_put = node_to_put % 3 + + r = c.get_all(0, timeout=10) # should get all of shard 1's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + original_get_all = r.json()["items"] + + conductor.add_shard("shard2", conductor.get_nodes([2])) + fx.broadcast_view(conductor.get_shard_view()) + + r = c.get_all(2, timeout=10) # should get all of shard 2's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + get_all_1 = r.json()["items"] + keys1 = get_all_1.keys() + + r = c.get_all(1, timeout=10) # should get all of shard 1's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + get_all_2 = r.json()["items"] + keys2 = get_all_2.keys() + + for key in keys1: + assert not (key in keys2) + for key in keys2: + assert not (key in keys1) + + assert original_get_all.keys() == keys1 | keys2 + assert len(original_get_all) == len(keys1) + len(keys2) + + return True, "ok" + + +def basic_shuffle_3(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + c = KVSMultiClient(fx.clients, "client", log) + conductor.add_shard("shard1", conductor.get_nodes([0])) + conductor.add_shard("shard2", conductor.get_nodes([1])) + + fx.broadcast_view(conductor.get_shard_view()) + + node_to_put = 0 + base_key = "key" + # Put 15 keys + for i in range(15): + log(f"Putting key {i}\n") + r = c.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) + assert r.ok, f"expected ok for new key, got {r.status_code}" + node_to_put += 1 + node_to_put = node_to_put % 2 + + # Get all keys + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + r = c.get_all(1, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + log(f"Shard 1 keys: {shard1_keys}\n") + log(f"Shard 2 keys: {shard2_keys}\n") + + # Total number of keys should matched number of keys put + assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + + # Add a 3rd shard, causing a shuffle. There should still be 15 keys at the end. + log("Adding 3rd shard\n") + conductor.add_shard("shard3", conductor.get_nodes([2])) + fx.broadcast_view(conductor.get_shard_view()) + + # Get the keys on shard 1 + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + log(f"Shard 1 keys: {shard1_keys}\n") + + # get the keys on shard 2 + r = c.get_all(1, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + log(f"Shard 2 keys: {shard2_keys}\n") + + # get the keys on shard 3 + r = c.get_all(2, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard3_keys = res + + log(f"Shard 3 keys: {shard3_keys}\n") + + assert len(shard1_keys) + len(shard2_keys) + len(shard3_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys) + len(shard3_keys)}" + + # Remove shard 3, causing a shuffle. Move Node 2 to shard 1 so the keys should still exist, and be shuffled + conductor.remove_shard("shard3") + conductor.add_node_to_shard("shard1", conductor.get_node(2)) + fx.broadcast_view(conductor.get_shard_view()) + + r = c.get_all(0, timeout=10) # should get all of shard 1's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys = res + + r = c.get_all(1, timeout=10) # should get all of shard 2's keys + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard2_keys = res + + assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" + + # Remove shard 2. This loses keys. + conductor.remove_shard("shard2") + fx.broadcast_view(conductor.get_shard_view()) + + r = c.get_all(0, timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + res = r.json()["items"] + shard1_keys_after_delete = res + + assert len(shard1_keys_after_delete) == 15, f"expected 15 keys, got {len(shard1_keys_after_delete)}" + + + return True, "ok" + + +SHUFFLE_TESTS = [TestCase("basic_shuffle_add_remove", basic_shuffle_add_remove), TestCase("basic_shuffle", basic_shuffle), TestCase("basic_shuffle_2", basic_shuffle_2), TestCase("basic_shuffle_3", basic_shuffle_3)] diff --git a/utils/containers.py b/utils/containers.py index 5737800419df035d9aad76973da9f7302f7e3833..1c677cd84e94aa7041930a4d5d434a38a9c2b132 100644 --- a/utils/containers.py +++ b/utils/containers.py @@ -44,6 +44,9 @@ class ClusterNode: ) networks: List[str] # networks the container is attached to + def get_view(self) -> str: + return {"address": f"{self.ip}:{self.port}", "id": self.index} + def internal_endpoint(self) -> str: return f"http://{self.ip}:{self.port}" @@ -108,7 +111,7 @@ class ClusterConductor: def dump_all_container_logs(self, dir): self.log("dumping logs of kvs containers") - container_pattern = f"^kvs_{self.group_id}.*" + container_pattern = f"^kvs_{self.group_id}_.*" container_regex = re.compile(container_pattern) containers = self._list_containers() @@ -202,9 +205,6 @@ class ClusterConductor: if container and container_regex.match(container) ] self._remove_containers(containers_to_remove) - # for container in containers: - # if container and container_regex.match(container): - # self._remove_container(container) # cleanup networks self.log(f" cleaning up {'group' if group_only else 'all'} networks") @@ -437,8 +437,7 @@ class ClusterConductor: ].base_url = self.node_external_endpoint(node.index) view_changed = True if view_changed and hasattr(self, "_parent"): - self._parent.broadcast_view(self.get_full_view()) - + self._parent.rebroadcast_view(self.get_shard_view()) def create_partition(self, node_ids: List[int], partition_id: str) -> None: net_name = f"kvs_{self.group_id}_net_{partition_id}" @@ -502,10 +501,11 @@ class ClusterConductor: node.index ].base_url = self.node_external_endpoint(node.index) view_changed = True - if view_changed and hasattr(self, "_parent"): - self._parent.broadcast_view(self.get_full_view()) - def get_node(self, index): - return self.nodes[index] + #if view_changed and hasattr(self, "_parent"): + # self._parent.rebroadcast_view(self.get_shard_view()) + + DeprecationWarning("View is in updated format") + def get_full_view(self): view = [] for node in self.nodes: diff --git a/utils/kvs_api.py b/utils/kvs_api.py index 8353f1bba508cc2fce99b85e44c20cc0b01183d4..838662c4a9f1a9f9233e0e5b7d519b759fce364b 100644 --- a/utils/kvs_api.py +++ b/utils/kvs_api.py @@ -129,5 +129,24 @@ class KVSClient: if not isinstance(view, dict): raise ValueError("view must be a dict") + self.last_view = view request_body = {"view": view} return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout) + + def resend_last_view_with_ips_from_new_view(self, current_view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> requests.Response: + if not isinstance(current_view, dict): + raise ValueError("view must be a dict") + if not hasattr(self, "last_view"): + raise LookupError("Must have sent at least one view before calling resend.") + flattened_current_view = {} + for shard_key in current_view: + for node in current_view[shard_key]: + flattened_current_view[node["id"]] = node["address"] + + for shard_key in self.last_view: + for node in self.last_view[shard_key]: + node['address'] = flattened_current_view[node["id"]] + + request_body = {"view": self.last_view} + return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout) + \ No newline at end of file