Skip to content
Snippets Groups Projects
Commit d0c99e5d authored by Thomas Dillow's avatar Thomas Dillow
Browse files

Basic resharding bnechmark

parent 3b7c18fa
No related branches found
No related tags found
No related merge requests found
...@@ -83,6 +83,7 @@ from .tests.asgn3.eventual_consistency.convergence_basic import CONVERGENCE_TEST ...@@ -83,6 +83,7 @@ from .tests.asgn3.eventual_consistency.convergence_basic import CONVERGENCE_TEST
# from .tests.asgn3.view_change.view_change_basic import VIEW_CHANGE_TESTS # from .tests.asgn3.view_change.view_change_basic import VIEW_CHANGE_TESTS
from .tests.proxy.basic_proxy import PROXY_TESTS from .tests.proxy.basic_proxy import PROXY_TESTS
from .tests.shuffle.basic_shuffle import SHUFFLE_TESTS from .tests.shuffle.basic_shuffle import SHUFFLE_TESTS
from .tests.bench.benchmark import BENCHMARKS
from .tests.stress.stress_tests import STRESS_TESTS from .tests.stress.stress_tests import STRESS_TESTS
TEST_SET = [] TEST_SET = []
...@@ -93,6 +94,7 @@ TEST_SET.extend(CAUSAL_TESTS) ...@@ -93,6 +94,7 @@ TEST_SET.extend(CAUSAL_TESTS)
TEST_SET.extend(CONVERGENCE_TESTS) TEST_SET.extend(CONVERGENCE_TESTS)
TEST_SET.extend(PROXY_TESTS) TEST_SET.extend(PROXY_TESTS)
TEST_SET.extend(SHUFFLE_TESTS) TEST_SET.extend(SHUFFLE_TESTS)
TEST_SET.extend(BENCHMARKS)
TEST_SET.extend(STRESS_TESTS) TEST_SET.extend(STRESS_TESTS)
# TEST_SET.extend(VIEW_CHANGE_TESTS) # TEST_SET.extend(VIEW_CHANGE_TESTS)
......
File moved
File moved
[project] [project]
name = "cse138-asgn3-tests" name = "cse138-asgn4-tests"
version = "0.1.0" version = "0.1.0"
description = "Add your description here" description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = [ dependencies = [
"requests>=2.32.3", "requests>=2.32.3",
"matplotlib>=3.4.3",
"aiohttp>=3.8.1",
] ]
[project.scripts] [project.scripts]
cse138-asgn3-tests = "cse138_asgn3_tests.__main__:main" cse138-asgn4-tests = "cse138_asgn4_tests.__main__:main"
from ...utils.containers import ClusterConductor
from ...utils.testcase import TestCase
from ...utils.util import Logger
from ..helper import KVSMultiClient, KVSTestFixture
from ...utils.kvs_api import DEFAULT_TIMEOUT
import time
import asyncio
import matplotlib.pyplot as plt
def benchmark_add_shard(conductor: ClusterConductor, dir, log: Logger):
with KVSTestFixture(conductor, dir, log, node_count=16) as fx:
conductor.add_shard("shard1", conductor.get_nodes([0]))
fx.broadcast_view(conductor.get_shard_view())
log("putting 100 keys\n")
put_times = []
for i in range(1000):
c = KVSMultiClient(fx.clients, "client", log)
start_time = time.time()
r = c.put(0, f"key{i}", f"value{i}", timeout=10)
end_time = time.time()
assert r.ok, f"expected ok for new key, got {r.status_code}"
put_times.append(end_time - start_time)
log("Starting benchmark\n")
reshard_times = []
for shard in range(2, 17):
start_time = time.time()
log(f"adding shard{shard}\n")
conductor.add_shard(f"shard{shard}", conductor.get_nodes([shard - 1]))
asyncio.run(fx.parallel_broadcast_view(conductor.get_shard_view()))
end_time = time.time()
reshard_times.append(end_time - start_time)
log(f"reshard time with {shard} shards: {reshard_times[-1]}\n")
log("Average put time: ", sum(put_times) / len(put_times))
for shard, time_taken in enumerate(reshard_times, start=2):
log(f"shard count: {shard}, reshard time: {time_taken}")
# Generate plot
plt.figure(figsize=(10, 6))
plt.plot(range(2, 17), reshard_times, marker='o')
plt.title('Reshard Times')
plt.xlabel('Number of Shards')
plt.ylabel('Time (seconds)')
plt.grid(True)
plt.savefig(f"{dir}/reshard_times.png")
return True, "ok"
BENCHMARKS = [TestCase("benchmark_add_shard", benchmark_add_shard)]
\ No newline at end of file
...@@ -5,6 +5,7 @@ from ..utils.containers import ClusterConductor ...@@ -5,6 +5,7 @@ from ..utils.containers import ClusterConductor
from ..utils.kvs_api import KVSClient from ..utils.kvs_api import KVSClient
from ..utils.util import Logger from ..utils.util import Logger
import asyncio
class KVSTestFixture: class KVSTestFixture:
def __init__(self, conductor: ClusterConductor, dir, log: Logger, node_count: int): def __init__(self, conductor: ClusterConductor, dir, log: Logger, node_count: int):
...@@ -36,6 +37,19 @@ class KVSTestFixture: ...@@ -36,6 +37,19 @@ class KVSTestFixture:
) )
self.log(f"view sent to node {i}: {r.status_code} {r.text}") self.log(f"view sent to node {i}: {r.status_code} {r.text}")
async def parallel_broadcast_view(self, view: Dict[str, List[Dict[str, Any]]]):
self.log(f"\n> SEND VIEW: {view}")
async def send_view(client: KVSClient, i: int):
r = await client.async_send_view(view)
assert (
r.status == 200
), f"expected 200 to ack view, got {r.status}"
self.log(f"view sent to node {i}: {r.status} {r.text}")
tasks = [send_view(client, i) for i, client in enumerate(self.clients)]
await asyncio.gather(*tasks)
def rebroadcast_view(self, new_view: Dict[str, List[Dict[str, Any]]]): def rebroadcast_view(self, new_view: Dict[str, List[Dict[str, Any]]]):
for i, client in enumerate(self.clients): for i, client in enumerate(self.clients):
r = client.resend_last_view_with_ips_from_new_view(new_view) r = client.resend_last_view_with_ips_from_new_view(new_view)
......
...@@ -102,11 +102,7 @@ def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger): ...@@ -102,11 +102,7 @@ def basic_shuffle_add_remove(conductor: ClusterConductor, dir, log: Logger):
res = r.json()["items"] res = r.json()["items"]
shard2_keys = res shard2_keys = res
c.reset_model() assert len(shard1_keys) + len(shard2_keys) == 15, f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}"
assert len(shard1_keys) + len(shard2_keys) == 15, (
f"expected 15 keys, got {len(shard1_keys) + len(shard2_keys)}"
)
# Remove shard 2. This loses keys. # Remove shard 2. This loses keys.
conductor.remove_shard("shard2") conductor.remove_shard("shard2")
......
import requests import requests
from typing import Dict, Any, List from typing import Dict, Any, List
import aiohttp
""" """
Request Timeout status code. Request Timeout status code.
...@@ -133,6 +134,20 @@ class KVSClient: ...@@ -133,6 +134,20 @@ class KVSClient:
request_body = {"view": view} request_body = {"view": view}
return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout) return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout)
async def async_send_view(self, view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> aiohttp.ClientResponse:
if not isinstance(view, dict):
raise ValueError("view must be a dict")
self.last_view = view
request_body = {"view": view}
async with aiohttp.ClientSession() as session:
async with session.put(f"{self.base_url}/view", json=request_body, timeout=timeout) as response:
return response
if response.status != 200:
raise RuntimeError(f"failed to send view: {response.status}")
return response
def resend_last_view_with_ips_from_new_view(self, current_view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> requests.Response: def resend_last_view_with_ips_from_new_view(self, current_view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> requests.Response:
if not isinstance(current_view, dict): if not isinstance(current_view, dict):
raise ValueError("view must be a dict") raise ValueError("view must be a dict")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment