diff --git a/__main__.py b/__main__.py index b4b32def3c57c3864965c5f4e6393f483f491b77..17f249b26edd71541c11cb41a0bb9a7815fbb104 100755 --- a/__main__.py +++ b/__main__.py @@ -23,7 +23,13 @@ TEST_GROUP_ID = "hw3" class TestRunner: - def __init__(self, project_dir: str, debug_output_dir: str): + def __init__( + self, + project_dir: str, + debug_output_dir: str, + group_id=TEST_GROUP_ID, + thread_id="0", + ): self.project_dir = project_dir self.debug_output_dir = debug_output_dir # builder to build container image @@ -32,7 +38,8 @@ class TestRunner: ) # network manager to mess with container networking self.conductor = ClusterConductor( - group_id=TEST_GROUP_ID, + group_id=group_id, + thread_id=thread_id, base_image=CONTAINER_IMAGE_ID, external_port_base=9000, log=global_logger(), @@ -48,7 +55,7 @@ class TestRunner: # aggressively clean up anything kvs-related # NOTE: this disallows parallel run processes, so turn it off for that - self.conductor.cleanup_hanging(group_only=False) + self.conductor.cleanup_hanging(group_only=True) def cleanup_environment(self) -> None: log("\n-- cleanup_environment --") @@ -125,9 +132,9 @@ def main(): "--num-threads", type=int, default=1, help="number of threads to run tests in" ) parser.add_argument( - "--run-id", - default="", - help="Id for this run (prepended to docker containers & networks) (useful for running two versions of the test suite in parallel)", + "--group-id", + default=TEST_GROUP_ID, + help="Group Id (prepended to docker containers & networks) (useful for running two versions of the test suite in parallel)", ) parser.add_argument( "--port-offset", type=int, default=1000, help="port offset for each test" @@ -136,7 +143,12 @@ def main(): args = parser.parse_args() project_dir = os.getcwd() - runner = TestRunner(project_dir=project_dir, debug_output_dir=DEBUG_OUTPUT_DIR) + runner = TestRunner( + project_dir=project_dir, + debug_output_dir=DEBUG_OUTPUT_DIR, + group_id=args.group_id, + thread_id="0", + ) runner.prepare_environment(build=args.build) if args.filter is not None: @@ -152,7 +164,7 @@ def main(): log("\n== RUNNING TESTS ==") run_tests = [] - def run_test(test: TestCase, gid: str, port_offset: int): + def run_test(test: TestCase, gid: str, thread_id: str, port_offset: int): log(f"\n== TEST: [{test.name}] ==\n") test_set_name = test.name.lower().split("_")[0] test_dir = create_test_dir(DEBUG_OUTPUT_DIR, test_set_name, test.name) @@ -164,6 +176,7 @@ def main(): logger = Logger(files=(log_file, sys.stderr)) conductor = ClusterConductor( group_id=gid, + thread_id=f"{thread_id}", base_image=CONTAINER_IMAGE_ID, external_port_base=9000 + port_offset, log=logger, @@ -182,7 +195,7 @@ def main(): if args.num_threads == 1: print("Running tests sequentially") for test in TEST_SET: - if not run_test(test, gid=f"{args.run_id}0", port_offset=0): + if not run_test(test, gid=args.group_id, thread_id="0", port_offset=0): if not args.run_all: print("--run-all not set, stopping at first failure") break @@ -191,7 +204,10 @@ def main(): pool = ThreadPool(processes=args.num_threads) pool.map( lambda a: run_test( - a[1], gid=f"{args.run_id}{a[0]}", port_offset=a[0] * args.port_offset + a[1], + gid=args.group_id, + thread_id=f"{a[0]}", + port_offset=a[0] * args.port_offset, ), enumerate(TEST_SET), ) diff --git a/tests/proxy/basic_proxy.py b/tests/proxy/basic_proxy.py index 404fe65aec8dded48f6f6e7a6ff6141ef719e78f..4246b95b2718f93d352ae6b468085c3a5d33e162 100644 --- a/tests/proxy/basic_proxy.py +++ b/tests/proxy/basic_proxy.py @@ -24,7 +24,7 @@ def basic_proxy_one_client(conductor: ClusterConductor, dir, log: Logger): node_to_put += 1 node_to_put = node_to_put % 4 - r = c.get_all(0, timeout=120) # should get all of shard 1's keys + r = c.get_all(0, timeout=20) # should get all of shard 1's keys assert r.ok, f"expected ok for get, got {r.status_code}" items = r.json()["items"] keys = items.keys() @@ -50,14 +50,14 @@ def basic_proxy_many_clients(conductor: ClusterConductor, dir, log: Logger): node_to_put = 0 base_key = "key" - for i in range(0, 1000): + for i in range(0, 10000): c1 = KVSMultiClient(fx.clients, "client", log) r = c1.put(node_to_put, f"{base_key}{i}", f"{i}", timeout=10) assert r.ok, f"expected ok for new key, got {r.status_code}" node_to_put += 1 node_to_put = node_to_put % 7 - r = c.get_all(0, timeout=120) # should get all of shard 1's keys + r = c.get_all(0, timeout=20) # should get all of shard 1's keys assert r.ok, f"expected ok for get, got {r.status_code}" items = r.json()["items"] keys = items.keys() diff --git a/utils/containers.py b/utils/containers.py index 9d20fc9266f4000960148c528e4d119242e774c2..f36dcb5449fbe29990b015a180733bd9d26137b4 100644 --- a/utils/containers.py +++ b/utils/containers.py @@ -61,19 +61,21 @@ class ClusterConductor: def __init__( self, group_id: str, + thread_id: str, base_image: str, log: Logger, external_port_base: int = 8081, ): self.group_id = group_id + self.thread_id = thread_id self.base_image = base_image self.base_port = external_port_base self.nodes: List[ClusterNode] = [] self.shards: dict[str, List[ClusterNode]] = {} # naming patterns - self.group_ctr_prefix = f"kvs_{group_id}_node" - self.group_net_prefix = f"kvs_{group_id}_net" + self.group_ctr_prefix = f"kvs_{group_id}_{thread_id}_node" + self.group_net_prefix = f"kvs_{group_id}_{group_id}_net" # base network self.base_net_name = f"{self.group_net_prefix}_base" @@ -112,7 +114,7 @@ class ClusterConductor: def dump_all_container_logs(self, dir): self.log("dumping logs of kvs containers") - container_pattern = f"^kvs_{self.group_id}_.*" + container_pattern = f"^kvs_{self.group_id}_{self.thread_id}_.*" container_regex = re.compile(container_pattern) containers = self._list_containers() @@ -188,8 +190,8 @@ class ClusterConductor: # otherwise clean up anything kvs related if group_only: self.log(f"cleaning up group {self.group_id}") - container_pattern = f"^kvs_{self.group_id}_.*" - network_pattern = f"^kvs_{self.group_id}_net_.*" + container_pattern = f"^kvs_{self.group_id}_{self.thread_id}_.*" + network_pattern = f"^kvs_{self.group_id}_{self.thread_id}_net_.*" else: self.log("cleaning up all kvs containers and networks") container_pattern = "^kvs_.*" @@ -226,7 +228,7 @@ class ClusterConductor: return False def _node_name(self, index: int) -> str: - return f"kvs_{self.group_id}_node_{index}" + return f"kvs_{self.group_id}_{self.thread_id}_node_{index}" def node_external_endpoint(self, index: int) -> str: return self.nodes[index].external_endpoint() @@ -374,7 +376,7 @@ class ClusterConductor: self.log(f" {part_name}: {nodes}") def my_partition(self, node_ids: List[int], partition_id: str) -> None: - net_name = f"kvs_{self.group_id}_net_{partition_id}" + net_name = f"kvs_{self.group_id}_{self.thread_id}_net_{partition_id}" self.log(f"creating partition {partition_id} with nodes {node_ids}") # create partition network if it doesn't exist @@ -450,7 +452,7 @@ class ClusterConductor: self._parent.rebroadcast_view(self.get_shard_view()) def create_partition(self, node_ids: List[int], partition_id: str) -> None: - net_name = f"kvs_{self.group_id}_net_{partition_id}" + net_name = f"kvs_{self.group_id}_{self.thread_id}_net_{partition_id}" self.log(f"creating partition {partition_id} with nodes {node_ids}") @@ -579,7 +581,7 @@ class ClusterConductor: } def get_partition_view(self, partition_id: str): - net_name = f"kvs_{self.group_id}_net_{partition_id}" + net_name = f"kvs_{self.group_id}_{self.thread_id}_net_{partition_id}" view = [] for node in self.nodes: if net_name in node.networks: diff --git a/utils/kvs_api.py b/utils/kvs_api.py index a78427f0625416187421a2f449efc148bb7bfb48..ecd43e26626ef1866f7f3dfe62218c8ec02ff71c 100644 --- a/utils/kvs_api.py +++ b/utils/kvs_api.py @@ -136,7 +136,7 @@ class KVSClient: return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout) async def async_send_view( - self, view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT + self, view: dict[str, List[Dict[str, Any]]], timeout: float = None ) -> aiohttp.ClientResponse: if not isinstance(view, dict): raise ValueError("view must be a dict") @@ -151,7 +151,6 @@ class KVSClient: return response if response.status != 200: raise RuntimeError(f"failed to send view: {response.status}") - return response def resend_last_view_with_ips_from_new_view( self,