diff --git a/__main__.py b/__main__.py index cab3aa34c034e43bd017d6aa570e0146d188b097..3c3e15f12decde4364d2ebd67841a2e5db64065e 100755 --- a/__main__.py +++ b/__main__.py @@ -83,6 +83,7 @@ from .tests.asgn3.eventual_consistency.convergence_basic import CONVERGENCE_TEST # from .tests.asgn3.view_change.view_change_basic import VIEW_CHANGE_TESTS from .tests.proxy.basic_proxy import PROXY_TESTS from .tests.shuffle.basic_shuffle import SHUFFLE_TESTS +from .tests.bench.benchmark import BENCHMARKS from .tests.stress.stress_tests import STRESS_TESTS TEST_SET = [] @@ -93,6 +94,7 @@ TEST_SET.extend(CAUSAL_TESTS) TEST_SET.extend(CONVERGENCE_TESTS) TEST_SET.extend(PROXY_TESTS) TEST_SET.extend(SHUFFLE_TESTS) +TEST_SET.extend(BENCHMARKS) TEST_SET.extend(STRESS_TESTS) # TEST_SET.extend(VIEW_CHANGE_TESTS) diff --git a/pkg/cse138_asgn3_tests/__main__.py b/pkg/cse138_asgn4_tests/__main__.py similarity index 100% rename from pkg/cse138_asgn3_tests/__main__.py rename to pkg/cse138_asgn4_tests/__main__.py diff --git a/pkg/cse138_asgn3_tests/tests b/pkg/cse138_asgn4_tests/tests similarity index 100% rename from pkg/cse138_asgn3_tests/tests rename to pkg/cse138_asgn4_tests/tests diff --git a/pkg/cse138_asgn3_tests/utils b/pkg/cse138_asgn4_tests/utils similarity index 100% rename from pkg/cse138_asgn3_tests/utils rename to pkg/cse138_asgn4_tests/utils diff --git a/pkg/pyproject.toml b/pkg/pyproject.toml index 3c86903905401ae5c573ebb046fdd9a360485894..17bd41e19af03119ddf4e98bf1fddc810aac1d70 100644 --- a/pkg/pyproject.toml +++ b/pkg/pyproject.toml @@ -1,12 +1,14 @@ [project] -name = "cse138-asgn3-tests" +name = "cse138-asgn4-tests" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.10" dependencies = [ "requests>=2.32.3", + "matplotlib>=3.4.3", + "aiohttp>=3.8.1", ] [project.scripts] -cse138-asgn3-tests = "cse138_asgn3_tests.__main__:main" +cse138-asgn4-tests = "cse138_asgn4_tests.__main__:main" diff --git a/tests/bench/benchmark.py b/tests/bench/benchmark.py new file mode 100644 index 0000000000000000000000000000000000000000..b5f664e8df754ac659c1e728d86dd549ccf3c661 --- /dev/null +++ b/tests/bench/benchmark.py @@ -0,0 +1,51 @@ +from ...utils.containers import ClusterConductor +from ...utils.testcase import TestCase +from ...utils.util import Logger +from ..helper import KVSMultiClient, KVSTestFixture +from ...utils.kvs_api import DEFAULT_TIMEOUT +import time +import asyncio +import matplotlib.pyplot as plt + +def benchmark_add_shard(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=16) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0])) + fx.broadcast_view(conductor.get_shard_view()) + + log("putting 100 keys\n") + put_times = [] + for i in range(1000): + c = KVSMultiClient(fx.clients, "client", log) + start_time = time.time() + r = c.put(0, f"key{i}", f"value{i}", timeout=10) + end_time = time.time() + assert r.ok, f"expected ok for new key, got {r.status_code}" + put_times.append(end_time - start_time) + + log("Starting benchmark\n") + reshard_times = [] + for shard in range(2, 17): + start_time = time.time() + log(f"adding shard{shard}\n") + conductor.add_shard(f"shard{shard}", conductor.get_nodes([shard - 1])) + asyncio.run(fx.parallel_broadcast_view(conductor.get_shard_view())) + end_time = time.time() + reshard_times.append(end_time - start_time) + log(f"reshard time with {shard} shards: {reshard_times[-1]}\n") + + log("Average put time: ", sum(put_times) / len(put_times)) + for shard, time_taken in enumerate(reshard_times, start=2): + log(f"shard count: {shard}, reshard time: {time_taken}") + + # Generate plot + plt.figure(figsize=(10, 6)) + plt.plot(range(2, 17), reshard_times, marker='o') + plt.title('Reshard Times') + plt.xlabel('Number of Shards') + plt.ylabel('Time (seconds)') + plt.grid(True) + plt.savefig(f"{dir}/reshard_times.png") + + return True, "ok" + +BENCHMARKS = [TestCase("benchmark_add_shard", benchmark_add_shard)] \ No newline at end of file diff --git a/tests/helper.py b/tests/helper.py index e1344974a471bad8032dc108b171854590827bd6..e33a803b47c78a41027ce2ab5bfaa20f77fa98ba 100644 --- a/tests/helper.py +++ b/tests/helper.py @@ -5,6 +5,7 @@ from ..utils.containers import ClusterConductor from ..utils.kvs_api import KVSClient from ..utils.util import Logger +import asyncio class KVSTestFixture: def __init__(self, conductor: ClusterConductor, dir, log: Logger, node_count: int): @@ -36,6 +37,19 @@ class KVSTestFixture: ) self.log(f"view sent to node {i}: {r.status_code} {r.text}") + async def parallel_broadcast_view(self, view: Dict[str, List[Dict[str, Any]]]): + self.log(f"\n> SEND VIEW: {view}") + + async def send_view(client: KVSClient, i: int): + r = await client.async_send_view(view) + assert ( + r.status == 200 + ), f"expected 200 to ack view, got {r.status}" + self.log(f"view sent to node {i}: {r.status} {r.text}") + + tasks = [send_view(client, i) for i, client in enumerate(self.clients)] + await asyncio.gather(*tasks) + def rebroadcast_view(self, new_view: Dict[str, List[Dict[str, Any]]]): for i, client in enumerate(self.clients): r = client.resend_last_view_with_ips_from_new_view(new_view) diff --git a/tests/shuffle/basic_shuffle.py b/tests/shuffle/basic_shuffle.py index 93e86ba0ccc4c5b47081469311227745b38a6e77..a99859c8175c1a3780059a594842e6477299da58 100644 --- a/tests/shuffle/basic_shuffle.py +++ b/tests/shuffle/basic_shuffle.py @@ -102,11 +102,7 @@ def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): res = r.json()["items"] shard2_keys = res - c.reset_model() - - assert len(shard1_keys) + len(shard2_keys) == 15, ( - f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" - ) + assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}" # Remove shard 2. This loses keys. conductor.remove_shard("shard2") diff --git a/utils/kvs_api.py b/utils/kvs_api.py index e7bcb854be5e2b4762ecbffe8de656c9fcb4ffbc..ab6eb041a2292b1b7a414f13ac163efb8473dc70 100644 --- a/utils/kvs_api.py +++ b/utils/kvs_api.py @@ -1,5 +1,6 @@ import requests from typing import Dict, Any, List +import aiohttp """ Request Timeout status code. @@ -133,6 +134,20 @@ class KVSClient: request_body = {"view": view} return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout) + async def async_send_view(self, view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> aiohttp.ClientResponse: + if not isinstance(view, dict): + raise ValueError("view must be a dict") + + self.last_view = view + request_body = {"view": view} + + async with aiohttp.ClientSession() as session: + async with session.put(f"{self.base_url}/view", json=request_body, timeout=timeout) as response: + return response + if response.status != 200: + raise RuntimeError(f"failed to send view: {response.status}") + return response + def resend_last_view_with_ips_from_new_view(self, current_view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> requests.Response: if not isinstance(current_view, dict): raise ValueError("view must be a dict")