diff --git a/__main__.py b/__main__.py index 5837c275bcb48f7d97eca8be4bd7f5594a94100e..4c7f1d8c585ff1267ecd761bbd44072f7b2eb400 100755 --- a/__main__.py +++ b/__main__.py @@ -76,10 +76,16 @@ add more tests by appending to this list """ from .tests.hello import hello_cluster from .tests.basic.basic import BASIC_TESTS +from .tests.asgn3.availability.availability_basic import AVAILABILITY_TESTS +from .tests.asgn3.causal_consistency.causal_basic import CAUSAL_TESTS +from .tests.asgn3.eventual_consistency.convergence_basic import CONVERGENCE_TESTS TEST_SET = [] TEST_SET.append(TestCase("hello_cluster", hello_cluster)) TEST_SET.extend(BASIC_TESTS) +TEST_SET.extend(AVAILABILITY_TESTS) +TEST_SET.extend(CAUSAL_TESTS) +TEST_SET.extend(CONVERGENCE_TESTS) # set to True to stop at the first failing test FAIL_FAST = True diff --git a/tests/asgn3/availability/README.md b/tests/asgn3/availability/README.md new file mode 100644 index 0000000000000000000000000000000000000000..c4e6fc84d77223657483ffb44c2ac42bdc6d6b46 --- /dev/null +++ b/tests/asgn3/availability/README.md @@ -0,0 +1,51 @@ +# Availability Tests + +## Availability Basic + +### Availability Basic 1 + +#### Test Description +This is a basic test that has two clients and two servers who are quickly partitioned. Then, client one will do a put x = 1 to server one and then read the value back. This is expected to return the recently put value. Later in realtime, client two will do a read x from r2 and then do a put x =1 to server two. The expected behavior is to return a 404 on the read and then correctly return during the put. + +#### Explicit Violation +A violation of availability, in which during any read or write in this execution, someone hangs. This will be checked through a 2 seconds timeout (though can be adjusted). + +#### Lamport Diagram + + + +### Availability Basic 2 + +#### Test Description +A basic test that has three clients and three server nodes, which are partitioned away from each other. Each client writes to a different node and reads from that same node after some time has passed. Each write is to the same key but has a different value. The test asserts that the reads return different values. + +#### Explicit Violation +A violation is witnessed if either node hangs and the test times out. + +#### Lamport Diagram + + + +### Availability Basic 3 + +#### Test Description +A basic test that has two clients and three server nodes, which are partitioned away from each other. One of the clients writes to each node. Then the other client, reads these values in the same order in which the writes occur. Each write is to the same key but has a different value. The test asserts that the reads return different values. + +#### Explicit Violation +A violation is witnessed if either node hangs and the test times out. + +#### Lamport Diagram + + +## Availability Advanced + +### Availability Advanced 1 + +#### Test Description +This test has three servers who are partitioned between each other. There is one writer who writes values to each of the nodes. The expected behavior is to return a 201 (created). Then, we have three clients who read the recently put value in each server. The expected behavior is a return from each server with the value that was recently put on their kv. + +#### Explicit Violation +A violation of availability, in which during any read or write in this execution, someone hangs. This will be checked through a 2 seconds timeout (though can be adjusted). + +#### Lamport Diagram + diff --git a/tests/asgn3/availability/availability_basic.py b/tests/asgn3/availability/availability_basic.py new file mode 100644 index 0000000000000000000000000000000000000000..f44c868c5273188dd5e91fc8b280c5652177fef0 --- /dev/null +++ b/tests/asgn3/availability/availability_basic.py @@ -0,0 +1,207 @@ +from typing import List, Dict, Any, Optional +import requests + +from ....utils.containers import ClusterConductor +from ....utils.util import Logger +from ....utils.kvs_api import KVSClient +from ....utils.testcase import TestCase +from ...helper import KVSTestFixture, KVSMultiClient + + +def availability_basic_1(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + c1 = KVSMultiClient(fx.clients, "c1", log) + c2 = KVSMultiClient(fx.clients, "c2", log) + + log("\n> TEST AVAILABILE BASIC 1") + + # partition 0,1 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # Put x = 1 on 0 (paritioned from 1) + r = c1.put(0, "x", "1") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # Get it back + r = c1.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "1", f"wrong value returned: {r.json()}" + + # Get x from 1 (should be 404) + r = c2.get(1, "x") + assert r.status_code == 404, f"expected 404 for get, got {r.status_code}" + + # Create x = 2 + r = c2.put(1, "x", "2") + assert r.ok, f"expected ok for update, got {r.status_code}" + + # return score/reason + return True, "ok" + +def availability_basic_2(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1, 2])) + fx.broadcast_view(conductor.get_shard_view()) + c1 = KVSMultiClient(fx.clients, "c1", log) + c2 = KVSMultiClient(fx.clients, "c2", log) + c3 = KVSMultiClient(fx.clients, "c3", log) + + log("\n> TEST AVAILABLE BASIC 2") + + # partition 0,1, 2 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + conductor.create_partition([2], "p2") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # Put x = 1 on 0 (paritioned from everything else) + r = c1.put(0, "x", "1") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # Put x = 2 on 1 (paritioned from everything else) + r = c2.put(1, "x", "2") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # Put x = 3 on 2 (paritioned from everything else) + r = c3.put(2, "x", "3") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + + # Get it back + r = c1.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "1", f"wrong value returned: {r.json()}" + + # Get it back + r = c2.get(1, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "2", f"wrong value returned: {r.json()}" + + # Get it back + r = c3.get(2, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "3", f"wrong value returned: {r.json()}" + + # return score/reason + return True, "ok" + + +def availability_basic_3(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1, 2])) + fx.broadcast_view(conductor.get_shard_view()) + c1 = KVSMultiClient(fx.clients, "c1", log) + c2 = KVSMultiClient(fx.clients, "c2", log) + + log("\n> TEST AVAILABLE BASIC 3") + + # partition 0,1, 2 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + conductor.create_partition([2], "p2") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # Put x = 1 on 0 (paritioned from everything else) + r = c2.put(0, "x", "1") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # Put x = 2 on 1 (paritioned from everything else) + r = c2.put(1, "x", "2") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # Put x = 3 on 2 (paritioned from everything else) + r = c2.put(2, "x", "3") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + + # Get it back + r = c1.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "1", f"wrong value returned: {r.json()}" + + # Get it back + r = c1.get(1, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "2", f"wrong value returned: {r.json()}" + + # Get it back + r = c1.get(2, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "3", f"wrong value returned: {r.json()}" + + # return score/reason + return True, "ok" + + +def availability_advanced_1(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1, 2])) + fx.broadcast_view(conductor.get_shard_view()) + c1 = KVSMultiClient(fx.clients, "c1", log) # writer + c2 = KVSMultiClient(fx.clients, "c2", log) + c3 = KVSMultiClient(fx.clients, "c3", log) + c4 = KVSMultiClient(fx.clients, "c4", log) + + log("\n> TEST AVAILABILE ADVANCED 1") + + # partition 0,1 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + conductor.create_partition([2], "p2") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # Put x = 1 on 0 (isolated) + r = c1.put(0, "x", "1") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # Put x = 1 on 1 (isolated) + r = c1.put(1, "x", "2") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # Put x = 1 on 2 (isolated) + r = c1.put(2, "x", "3") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + + # Get it back + r = c2.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "1", f"wrong value returned: {r.json()}" + + # Get it back + r = c3.get(1, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "2", f"wrong value returned: {r.json()}" + + # Get it back + r = c4.get(2, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "3", f"wrong value returned: {r.json()}" + + # return score/reason + return True, "ok" + + + +AVAILABILITY_TESTS = [ + TestCase("availability_basic_1", availability_basic_1), + TestCase("availability_basic_2", availability_basic_2), + TestCase("availability_basic_3", availability_basic_3), + TestCase("availability_advanced_1", availability_advanced_1), +] diff --git a/tests/asgn3/availability/lamport-available-advanced-1.png b/tests/asgn3/availability/lamport-available-advanced-1.png new file mode 100644 index 0000000000000000000000000000000000000000..9ae0c44755b1862a1cee5a0fec334cda92b2b052 Binary files /dev/null and b/tests/asgn3/availability/lamport-available-advanced-1.png differ diff --git a/tests/asgn3/availability/lamport-available-basic-1.png b/tests/asgn3/availability/lamport-available-basic-1.png new file mode 100644 index 0000000000000000000000000000000000000000..44ab81b64ac03684fcc78edcd85dba5c433601b5 Binary files /dev/null and b/tests/asgn3/availability/lamport-available-basic-1.png differ diff --git a/tests/asgn3/availability/lamport_availability_basic_2.jpg b/tests/asgn3/availability/lamport_availability_basic_2.jpg new file mode 100644 index 0000000000000000000000000000000000000000..549cb8f7ef8317c495c6a822e75aefbfbfd4cad9 Binary files /dev/null and b/tests/asgn3/availability/lamport_availability_basic_2.jpg differ diff --git a/tests/asgn3/availability/lamport_availability_basic_3.jpg b/tests/asgn3/availability/lamport_availability_basic_3.jpg new file mode 100644 index 0000000000000000000000000000000000000000..542a412c4b16c83190c4ae5e2e19ecc4f4a67eea Binary files /dev/null and b/tests/asgn3/availability/lamport_availability_basic_3.jpg differ diff --git a/tests/asgn3/basic/README.md b/tests/asgn3/basic/README.md new file mode 100644 index 0000000000000000000000000000000000000000..0b51bff1b3791e2a2b4e741eec1e639836c66b7f --- /dev/null +++ b/tests/asgn3/basic/README.md @@ -0,0 +1 @@ +# Basic diff --git a/tests/asgn3/causal_consistency/README.md b/tests/asgn3/causal_consistency/README.md new file mode 100644 index 0000000000000000000000000000000000000000..66c27727f90f23c6ba00734f4a58c2de040b5d9c --- /dev/null +++ b/tests/asgn3/causal_consistency/README.md @@ -0,0 +1,161 @@ +# Causal Consistency + +## Causal Basic + +### Causal Basic 1: Bob Smells + +#### Test Description +The classic "bob smells" example. This test involves three clients and two server nodes. The first client writes x to a server node. The second client reads x from the same server node and writes y to the other server node. The last client performs two reads of y and x, in that order, to the latter server node. This server should hang. + +#### Explicit Violation +A violation would be if the last client reads anything from the server and terminates. + +#### Lamport Diagram + + +### Causal Basic 1: Bob Smells + +#### Test Description +The classic "bob smells" example. This test involves three clients and two server nodes. The first client writes x to a server node. The second client reads x from the same server node and writes y to the other server node. The last client performs two reads of y and x, in that order, to the latter server node. This server should hang. + +#### Explicit Violation +A violation would be if the last client reads anything from the server and terminates. + +#### Lamport Diagram + + +### Causal Basic 2: Healing Bob Smells + +#### Test Description +The classic "bob smells" example. This test involves three clients and two server nodes. The first client writes x to a server node. The second client reads x from the same server node and writes y to the other server node. The last client performs two reads of y and x, in that order, to the latter server node. This server should first hang. Then after the partition has healed, it should be able to read the correct value. + +#### Explicit Violation +A violation would be if the server hangs forever of if the last read returns something other than "bob smells". + +#### Lamport Diagram + + +### Causal advanced: Bob doesn't smell(old) +This test involves 3 clients and 2 server nodes, and is a play on the traditional bob smells test. The test involves two stages, the first, where the two servers are partitioned, and the second, where the two servers can talk to each other. From now on, we will refer to the clients as ca, cb, cc, and the servers as s0, s1. The only possible keys are x and y. Note: the passing of this test *implicitly* expects the server to tiebreak using timestamps after the primary tiebreaking method, and this may fail if a user has implemented a different tiebreaking method. + +The procedure is as follows: + +- ca put x in s0 (should be ok) +- ca read x from s1 (should hang) +- cb read x from s0 (should be ok) +- cb read x from s1 (should hang) +- cb write y to s1 (should be ok) +- cc write x to s1 (should be ok) +- cb read x from s1 (should be ok) +- cb write y to s0 (should be ok) +- cc read x from s0 (should hang) +- Partition Heals +- ca read x from s0 (should be ok) +- cc read x from x0 (should be ok) +- cc reads y from x0 (should be ok) + +#### Explicit violations +A violation can be observed if any of the operations that are supposed to hang terminates, or any of the operations that is supposed to be ok hangs. Additionally, the correct and most updated value should be retrieved. + +#### Lamport Diagram + + +### Causal Tiebreak Basic +Tests tiebreaking and consistency in breaking ties between concurrent writes of the same key to two partitioned servers. +Based on [Evan's pseudocode](https://discord.com/channels/1326277272566501486/1340435708254617600/1346238343330926693). +``` +partition(a) +partition(b) +c1.put(a, "x", 1) +c2.put(b, "x", 2) +if c1.get(b, "x") times out: + c1_hang = True +if c2.get(a, "x") times out: + c2_hang = True + +partition(a, b) +xa = c3.get(a, "x") +xb = c4.get(b, "x") +assert xa == xb + +if xa == 1: + assert c1_hang and not c2_hang +if xa == 2: + assert c2_hang and not c1_hang +``` + +#### Explicit Violations +If both or neither request hangs when c1 and c2 make a GET during the partition. If two other clients do not agree when reading the same value, the tiebreak was not consistent. + +#### Lamport Diagram + + +### Causal advanced: Bob doesn't smell(new - updated 3/2/2025) + +#### Test Description +This test involves 3 clients and 2 server nodes, and is a play on the traditional bob smells test. The test involves two stages, the first, where the two servers are partitioned, and the second, where the two servers can talk to each other. From now on, we will refer to the clients as ca, cb, cc, and the servers as s0, s1. The only possible keys are x and y + +The procedure is as follows: + +- ca put x in s0 (should be ok) +- ca read x from s1 (should hang) +- cb read x from s0 (should be ok) +- cb read x from s1 (should hang) +- cb write y to s1 (should be ok) +- cc reads y from s1(should be ok) +- cc reads x from s0(should be ok) +- cc write x to s1 (should be ok) +- cb read x from s1 (should not hang to preserve avaliability, assuming the previous execution is successful) +- cb write y to s0 (should be ok) +- cc read x from s0 (should hang) +- *Partition Heals* +- ca read x from s0 (should be ok) +- cc read x from s0 (should be ok) +- cc reads y from s0 (should be ok) + +#### Explicit violations +A violation can be observed if any of the operations that are supposed to hang terminates, or any of the operations that is supposed to be ok hangs. Additionally, the correct and most updated value should be retrieved. + +#### Lamport Diagram + + +### Causal basic: Get all keys (Updated 3/3/2025) +This is a test that involves 3 clients and 2 server nodes and ensures causal consistency between two servers even though the servers are not able to communicate with each other. There are three components to this test. First, client 1 sends x to server 1, and y to server 2. Both of these operations are expected to succeed. Second, client 1 requests all from both server 1 and server 2, which are both supposed to hang, client 2 requests all from server 1, which is expected to succeed, and requests all from server 2, which is expected to hang. Finally, client 3 requests all from server 2, which is expected to pass. The validity of this test is discussed [here](https://discord.com/channels/1326277272566501486/1340435708254617600/1345999275322118228), *and updated [here](https://discord.com/channels/1326277272566501486/1340435708254617600/1346201166283018333)*. + +This test is updated on 3/3/2025 after reflecting on correctness if client 3 should hang or not. As the test contributers currently understand it, the client should not hang, as it is equivalent to executing parallel gets, with no causal dependencies on each other. + +#### Explicit violations +A violation can be observed if any of the operations that are supposed to hang terminates, or any of the operations that is supposed to be ok hangs. Additionally, the correct value should be retrieved in the case that is successful. + +#### Lamport Diagram + + +### Causal put no update 1-2 +We note that in causal relationship, *reads* establish the existence of the ordering(a user can always write, but some reads may hang). This test is a twist on the traditional bob smells fua test, but instead of establishing a hang after readying y, then reading x, we note that reading x as a new client allows a 404. + +The only difference between 1, 2 is the existence of data in server 2 before the initial bob smells execution + +Updated 3/3/2025: The test is updated to accurately reflect the lamport diagram, the only difference is the order of write x in server 1 and server 2. + + +#### Explicit Violations +Hanging on places the server is not supposed to, or termination when the server is supposed to hang. + +#### Lamport Diagram + + + +### Causal no overwrite future + +This is based on a bug I had in an earlier version of my kvs. On read, the server would completely overwrite the metadata without properly merging it. +This causes the client to be able to read versions of a key from the past. + +Client writes y to A, and then writes x to A, which means that x depends on the first y write. Then A and B are partitioned. Client writes y again, +which happens after the first write to y. Client reads x from B, which should immediately answer with the value of x. Client then tries to read +y from B, which should hang, because the client knows about a future version of y from after the partition. + +#### Explicit Violations +The server returns the version of x from the past (client knows about future version). + +#### Lamport Diagram + diff --git a/tests/asgn3/causal_consistency/bob_smells_read.jpg b/tests/asgn3/causal_consistency/bob_smells_read.jpg new file mode 100644 index 0000000000000000000000000000000000000000..9030e6bad6cfe28d63e03dfa4a1e8c130bab50c0 Binary files /dev/null and b/tests/asgn3/causal_consistency/bob_smells_read.jpg differ diff --git a/tests/asgn3/causal_consistency/bob_smells_read_2.jpg b/tests/asgn3/causal_consistency/bob_smells_read_2.jpg new file mode 100644 index 0000000000000000000000000000000000000000..6daf553d9c30d41aefcade4ff35f55929a47b22e Binary files /dev/null and b/tests/asgn3/causal_consistency/bob_smells_read_2.jpg differ diff --git a/tests/asgn3/causal_consistency/causal_basic.py b/tests/asgn3/causal_consistency/causal_basic.py new file mode 100644 index 0000000000000000000000000000000000000000..4dadaf187904a473a80a22b9a4698994dc09869c --- /dev/null +++ b/tests/asgn3/causal_consistency/causal_basic.py @@ -0,0 +1,520 @@ +from time import sleep + +from ....utils.containers import ClusterConductor +from ....utils.testcase import TestCase +from ....utils.util import Logger +from ...helper import KVSMultiClient, KVSTestFixture + + +def causal_basic_bob_smells(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + + log("\n> TEST CAUSAL BASIC 1") + + # partition 0,1, 2 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # alice writes x = bs to server node 0 + r = alice.put(0, "x", "bob smells") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # bob reads x from server node 0 + r = bob.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob smells", f"wrong value returned: {r.json()}" + + # bob writes y = fua for server node 1 + r = bob.put(1, "y", "f you alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # carol reads y from server node 1 + r = carol.get(1, "y") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "f you alice", f"wrong value returned: {r.json()}" + + log("Reading x from server 1 (should hang until timeout expires)") + # carol reads x from server node 1 - should hang! + r = carol.get(1, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + + # return score/reason + return True, "ok" + + +def causal_heal_bob_smells(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + + log("\n> TEST CAUSAL BASIC 2") + + # partition 0,1, 2 + conductor.my_partition([0], "base") + conductor.my_partition([1], "p1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # alice writes x = bs to server node 0 + r = alice.put(0, "x", "bob smells") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # bob reads x from server node 0 + r = bob.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob smells", f"wrong value returned: {r.json()}" + + # bob writes y = fua for server node 1 + r = bob.put(1, "y", "f you alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # carol reads y from server node 1 + r = carol.get(1, "y") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "f you alice", f"wrong value returned: {r.json()}" + + log("Reading x from server 1 (should hang until timeout expires)") + # carol reads x from server node 1 - should hang! + r = carol.get(1, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + + conductor.my_partition([0, 1], "base") + log("created new parititon and waiting for sync...") + sleep(10) + + r = carol.get(1, "x", timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob smells", f"wrong value returned: {r.json()}" + + # return score/reason + return True, "ok" + + +def causal_bob_doesnt_smell_old(conductor, dir, log): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + + log("\n> TEST CAUSAL BASIC 2") + + # partition 0,1 + conductor.my_partition([0], "base") + conductor.my_partition([1], "p1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + r = alice.put(0, "x", "bob smells") + assert r.ok, f"expected ok for new key, got {r.status_code}" + log("Reading x from server 1 (should hang until timeout expires)") + r = alice.get(1, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + + r = bob.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob smells", f"wrong value returned: {r.json()}" + log("Reading x from server 1 (should hang until timeout expires)") + r = bob.get(1, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + r = bob.put(1, "y", "f you alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = carol.put(1, "x", "bob doesn't smell") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = bob.get(1, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob doesn't smell", ( + f"wrong value returned: {r.json()}" + ) + r = bob.put(0, "y", "thanks carol") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + log( + "Reading x from server 0 after writing x to server 1 (should hang until timeout expires)" + ) + r = carol.get(0, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + + conductor.my_partition([0, 1], "base") + log("created new parititon and waiting for sync...") + sleep(10) + + r = alice.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob doesn't smell", ( + f"wrong value returned: {r.json()}" + ) + r = carol.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob doesn't smell", ( + f"wrong value returned: {r.json()}" + ) + log("Reading y from server 0(should NOT hang)") + r = carol.get(1, "y") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "thanks carol", f"wrong value returned: {r.json()}" + return True, "ok" + + +def causal_bob_doesnt_smell_updated(conductor, dir, log): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + + log("\n> TEST CAUSAL BASIC 2") + + # partition 0,1 + conductor.my_partition([0], "base") + conductor.my_partition([1], "p1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + r = alice.put(0, "x", "bob smells") + assert r.ok, f"expected ok for new key, got {r.status_code}" + log("Reading x from server 1 (should hang until timeout expires)") + r = alice.get(1, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + + r = bob.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob smells", f"wrong value returned: {r.json()}" + log("Reading x from server 1 (should hang until timeout expires)") + r = bob.get(1, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + r = bob.put(1, "y", "f you alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = carol.get(1, "y") + assert r.ok, f"expected ok for get, got {r.status_code}" + r = carol.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + r = carol.put(1, "x", "bob doesn't smell") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = bob.get(1, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob doesn't smell", ( + f"wrong value returned: {r.json()}" + ) + r = bob.put(0, "y", "thanks carol") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + log( + "Reading x from server 0 after writing x to server 1 (should hang until timeout expires)" + ) + r = carol.get(0, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + + conductor.my_partition([0, 1], "base") + log("created new parititon and waiting for sync...") + sleep(10) + + r = alice.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob doesn't smell", ( + f"wrong value returned: {r.json()}" + ) + r = carol.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob doesn't smell", ( + f"wrong value returned: {r.json()}" + ) + log("Reading y from server 0(should NOT hang)") + r = carol.get(1, "y") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "thanks carol", f"wrong value returned: {r.json()}" + return True, "ok" + + +def causal_get_all_1(conductor, dir, log): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + conductor.my_partition([0], "p0") + conductor.my_partition([1], "p1") + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + + r = alice.put(0, "x", "bob smells") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = alice.put(1, "y", "bob still smells! stinky :( ") + assert r.ok, f"expected ok for new key, got {r.status_code}" + log("Reading all from server 1 (should hang until timeout expires)") + r = alice.get_all(0) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get_all due to partition, got {r.status_code}" + ) + log("Reading all from server 1 (should hang until timeout expires)") + r = alice.get_all(1) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get_all due to partition, got {r.status_code}" + ) + + r = bob.get_all(0) + assert r.ok, f"expected ok for get_all, got {r.status_code}" + assert r.json()["items"] == {"x": "bob smells"}, ( + f"wrong value returned: {r.json()}" + ) + log("Reading all from server 1 (should hang until timeout expires)") + r = bob.get_all(1, timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get_all due to partition, got {r.status_code}" + ) + r = carol.get_all(1, timeout=10) + assert r.ok, f"expected ok for get_all, got {r.status_code}" + assert r.json()["items"] == {"y": "bob still smells! stinky :( "}, ( + f"wrong value returned: {r.json()}" + ) + return True, "ok" + + +def causal_bob_smells_read(conductor, dir, log): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + + log("\n> TEST CAUSAL PUT NO UPDATE") + + # partition 0,1, 2 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # alice writes x = bs to server node 0 + r = alice.put(0, "x", "bob smells") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # bob reads x from server node 0 + r = bob.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob smells", f"wrong value returned: {r.json()}" + + # bob writes y = fua for server node 1 + r = bob.put(1, "y", "f you alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # carol reads y from server node 1 + r = carol.get(1, "x") + assert r.status_code == 404, f"expected 404 for get, got {r.status_code}" + + log("Reading x from server 1 (should hang until timeout expires)") + r = carol.get(1, "y", timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "f you alice", f"wrong value returned: {r.json()}" + + # carol reads x from server node 1 - should hang! + r = carol.get(1, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + + # return score/reason + return True, "ok" + + +def causal_bob_smells_read_2(conductor, dir, log): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + + log("\n> TEST CAUSAL PUT NO UPDATE") + + # partition 0,1, 2 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # alice initializes x = bss to server 1 + r = alice.put(1, "x", "bob still smells") + assert r.ok, f"expected ok for new key, got {r.status_code}" + # alice writes x = bs to server node 0 + r = alice.put(0, "x", "bob smells") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # bob reads x from server node 0 + r = bob.get(0, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "bob smells", f"wrong value returned: {r.json()}" + + # bob writes y = fua for server node 1 + r = bob.put(1, "y", "f you alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # carol reads y from server node 1 + r = carol.get(1, "x") + assert r.ok, f"expected ok for new key, got {r.status_code}" + assert r.json()["value"] == "bob still smells", ( + f"expected value differs, expected 'bob still smells', got {r.json()['value']}" + ) + + r = carol.get(1, "y", timeout=10) + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "f you alice", f"wrong value returned: {r.json()}" + + log("Reading x from server 1 (should hang until timeout expires)") + # carol reads x from server node 1 - should hang! + r = carol.get(1, "x", timeout=10) + assert r.status_code == 408, ( + f"expected 408 (timeout) for get due to partition, got {r.status_code}" + ) + + # return score/reason + return True, "ok" + +def causal_basic_tiebreak(conductor, dir, log): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + david = KVSMultiClient(fx.clients, "david", log) + + log("\n> TEST CAUSAL BASIC tiebreak") + + # partition 0,1, 2 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + r = alice.put(0, "x", "1") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = bob.put(1, "x", "2") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = alice.get(1, "x") + alice_hang = r.status_code == 408 + r = bob.get(0, "x") + bob_hang = r.status_code == 408 + + conductor.create_partition([0, 1], "base") + sleep(10) + + + a = carol.get(0, "x") + assert a.ok, f"expected ok for get, got {a.status_code}" + b = david.get(1, "x") + assert b.ok, f"expected ok for get, got {b.status_code}" + + assert a.json()["value"] == b.json()["value"], f"expected {a} == {b}" + + if a.json()["value"] == "1": + assert alice_hang, f"expected alice to hang" + assert not bob_hang, f"expected bob to not hang" + if a.json()["value"] == "2": + assert not alice_hang, f"expected alice to not hang" + assert bob_hang, f"expected bob to hang" + + return True, "ok" + + + +def causal_no_overwrite_future(conductor: ClusterConductor, dir: str, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + a = 0 + b = 1 + c = KVSMultiClient(fx.clients, "c", log) + conductor.my_partition([a, b], "base") + + log("\n> TEST CAUSAL NO OVERWRITE FUTURE") + + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + assert c.put(a, "y", "old").ok, "expected ok for new key" + assert c.put(a, "x", "1").ok, "expected ok for new key" + + log("waiting for sync before partitioning") + sleep(10) + + assert c.get(b, "y").ok, "expected ok for get" + assert c.get(b, "x").ok, "expected ok for get" + + conductor.my_partition([a], "a") + conductor.my_partition([b], "b") + log("Partitioned a and b") + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + assert c.put(a, "y", "new").ok, "expected ok for updated key" + + r = c.get(b, "x") + assert r.ok, f"expected ok for get, got {r.status_code}" + assert r.json()["value"] == "1", f"wrong value returned: {r.json()}" + + r = c.get(b, "y") + assert r.status_code == 408, ( + f"expected timeout due to missing dependency {r.status_code}" + ) + + return True, "ok" + + +CAUSAL_TESTS = [ + TestCase("causal_basic_bob_smells", causal_basic_bob_smells), + TestCase("causal_heal_bob_smells", causal_heal_bob_smells), + TestCase("causal_bob_doesnt_smell_old", causal_bob_doesnt_smell_old), + TestCase("causal_bob_doesnt_smell_updated", causal_bob_doesnt_smell_updated), + TestCase("causal_basic_tiebreak", causal_basic_tiebreak), + TestCase("causal_get_all_1", causal_get_all_1), + TestCase("causal_bob_smells_read", causal_bob_smells_read), + TestCase("causal_bob_smells_read_2", causal_bob_smells_read_2), + TestCase("causal_no_overwrite_future", causal_no_overwrite_future), +] diff --git a/tests/asgn3/causal_consistency/lamport_causal_basic_tiebreak.jpg b/tests/asgn3/causal_consistency/lamport_causal_basic_tiebreak.jpg new file mode 100644 index 0000000000000000000000000000000000000000..9e54623cea937194202e051486433b037efa7729 Binary files /dev/null and b/tests/asgn3/causal_consistency/lamport_causal_basic_tiebreak.jpg differ diff --git a/tests/asgn3/causal_consistency/lamport_causal_bob_doesnt_smell.jpg b/tests/asgn3/causal_consistency/lamport_causal_bob_doesnt_smell.jpg new file mode 100644 index 0000000000000000000000000000000000000000..b309b500ad85de5a608a7fbede123399145131e5 Binary files /dev/null and b/tests/asgn3/causal_consistency/lamport_causal_bob_doesnt_smell.jpg differ diff --git a/tests/asgn3/causal_consistency/lamport_causal_bob_doesnt_smell_old.jpg b/tests/asgn3/causal_consistency/lamport_causal_bob_doesnt_smell_old.jpg new file mode 100644 index 0000000000000000000000000000000000000000..ed815a8a0395738dfbe40d5c4d677911a9ac5c44 Binary files /dev/null and b/tests/asgn3/causal_consistency/lamport_causal_bob_doesnt_smell_old.jpg differ diff --git a/tests/asgn3/causal_consistency/lamport_causal_bob_smells.jpg b/tests/asgn3/causal_consistency/lamport_causal_bob_smells.jpg new file mode 100644 index 0000000000000000000000000000000000000000..4200a712910165cd8fc04f0691907b64effa8b82 Binary files /dev/null and b/tests/asgn3/causal_consistency/lamport_causal_bob_smells.jpg differ diff --git a/tests/asgn3/causal_consistency/lamport_causal_get_all_1.jpg b/tests/asgn3/causal_consistency/lamport_causal_get_all_1.jpg new file mode 100644 index 0000000000000000000000000000000000000000..239853168fecf1a7ad4b41e42d779d13f5f6ca9c Binary files /dev/null and b/tests/asgn3/causal_consistency/lamport_causal_get_all_1.jpg differ diff --git a/tests/asgn3/causal_consistency/lamport_causal_heal_bob_smells.jpg b/tests/asgn3/causal_consistency/lamport_causal_heal_bob_smells.jpg new file mode 100644 index 0000000000000000000000000000000000000000..1075db94974495b3be2f98cfd84fc2391d1f2550 Binary files /dev/null and b/tests/asgn3/causal_consistency/lamport_causal_heal_bob_smells.jpg differ diff --git a/tests/asgn3/causal_consistency/no_overwrite_future.jpg b/tests/asgn3/causal_consistency/no_overwrite_future.jpg new file mode 100644 index 0000000000000000000000000000000000000000..ba5f8cb1e711a7f406316c554e0cab7a22907c5a Binary files /dev/null and b/tests/asgn3/causal_consistency/no_overwrite_future.jpg differ diff --git a/tests/asgn3/eventual_consistency/README.md b/tests/asgn3/eventual_consistency/README.md new file mode 100644 index 0000000000000000000000000000000000000000..0c19a5181e86d8c9ca3ca12dc1ff42d9c18d6f63 --- /dev/null +++ b/tests/asgn3/eventual_consistency/README.md @@ -0,0 +1,62 @@ +# Eventual Consistency + +## Eventual Consistency Basic + +### Convergence Concurrent Basic 1 + +#### Test Description +This is a basic test that has 4 clients and 3 servers, with no partitions. 3 clients perform exactly one write to a unique server, with the 4th client reading the value from all 3 servers 10 seconds (which can be modified) after the final write. The requests are concurrent and the time limit for eventual consistency has elapsed, so any value written to a server is valid, as long as all three servers agree. + +#### Explicit Violation +A violation of eventual consistency, in which at least two servers disagree about the value of x. + +#### Lamport Diagram + + +### Convergence Happens Before Basic 1 + +#### Test Description +A single client performs 3 sequential writes to 3 unique servers. After the writes complete, the same client reads from each server, and should get the same result from each server. + +#### Explicit Violation +The client recieves different values of x from any 2 servers. + +#### Lamport Diagram + + +### Convergence Get All 1 (basic) + +#### Test Description +A basic test for convergence using the `GET /data/` endpoint. We have 2 clients and 2 servers. The order of opertions is: + +1. Partition({A}) +2. Partition({B}) +3. c1.PUT(A, x=alice) +4. c1.PUT(B, z=testing123) +5. c2.PUT(B, x=bob) +6. c2.PUT(B, y=builder) +7. c2.GETALL(A), expect 408/timeout +8. Partition({A, B}), Sleep(10) +9. c2.GETALL(A) +10. c2.GETALL(B) +11. c1.GETALL(A) +12. c1.GETALL(B) + +The test will check if the states of the final 4 GETALL calls are equivalent to check for properly converging values for both servers. + +#### Explicit violation +If the clients timeout during reads, failed PUTs, or inconsistent final KV store states. + +#### Lamport Diagram + + +### Convergence Get All 2 (advanced) + +#### Test Description +The test starts with three servers aware of each other yet unable to communicate. There are three writer clients and three reader clients. The writer clients write to the three servers differently, with the same keys and different values. The reader clients are then reads the data from the three servers separately. Finally, the partition heals and the servers can communicate with each other again. The reader clients then read the data from the servers again and ensures that the data is consistent. + +#### Explicit Violation +The reader clients hang in any reads, or the data is inconsistent in the end. + +#### Lamport Diagram + diff --git a/tests/asgn3/eventual_consistency/convergence_basic.py b/tests/asgn3/eventual_consistency/convergence_basic.py new file mode 100644 index 0000000000000000000000000000000000000000..9d93d8c112d365c5d2f98b3506dd951096b5425e --- /dev/null +++ b/tests/asgn3/eventual_consistency/convergence_basic.py @@ -0,0 +1,267 @@ +import time + +from ....utils.containers import ClusterConductor +from ....utils.testcase import TestCase +from ....utils.util import Logger +from ...helper import KVSMultiClient, KVSTestFixture + +CONVERGENCE_TIME = 10 + + +def convergence_concurrent_basic_1(conductor: ClusterConductor, dir, log: Logger): + global CONVERGENCE_TIME + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1, 2])) + fx.broadcast_view(conductor.get_shard_view()) + c1 = KVSMultiClient(fx.clients, "c1", log) + c2 = KVSMultiClient(fx.clients, "c2", log) + c3 = KVSMultiClient(fx.clients, "c3", log) + c4 = KVSMultiClient(fx.clients, "c4", log) + + log("\n> TEST CONVERGENCE CONCURRENT BASIC 1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # Put x = 0 on 0 + r = c1.put(0, "x", "0") + assert r.ok, f"expected successful response for new key, got {r.status_code}" + + # Put x = 1 on 1 + r = c2.put(1, "x", "1") + assert r.ok, f"expected successful response, got {r.status_code}" + + # Put x = 2 on 2 + r = c3.put(2, "x", "2") + assert r.ok, f"expected successful response, got {r.status_code}" + + # Wait 10 seconds (the deadline for the eventual consistency) + log(f"\n> WAITING {CONVERGENCE_TIME} SECONDS") + time.sleep(CONVERGENCE_TIME) + + # Get them back, they should all match. + r = c4.get(0, "x") + assert r.status_code == 200, f"expected 200 for get, got {r.status_code}" + result = r.json()["value"] + + r = c4.get(1, "x") + assert r.status_code == 200, f"expected 200 for get, got {r.status_code}" + assert r.json()["value"] == result, f"Results are not consistent. Expected { + result + }, got {r.json()}(Comparison on value)" + + r = c4.get(2, "x") + assert r.status_code == 200, f"expected 200 for get, got {r.status_code}" + assert r.json()["value"] == result, f"Results are not consistent. Expected { + result + }, got {r.json()}(Comparison on value)" + + # return score/reason + return True, "ok" + + +def convergence_happens_before_basic_1(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1, 2])) + fx.broadcast_view(conductor.get_shard_view()) + c1 = KVSMultiClient(fx.clients, "c1", log) + + log("\n> TEST CONVERGENCE HAPPENS BEFORE BASIC 1") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # Put x = 0 on 0 + r = c1.put(0, "x", "0") + assert r.ok, f"expected successful response for new key, got {r.status_code}" + + # Put x = 1 on 1 + r = c1.put(1, "x", "1") + assert r.ok, f"expected successful response, got {r.status_code}" + + # Put x = 2 on 2 + r = c1.put(2, "x", "2") + assert r.ok, f"expected successful response, got {r.status_code}" + + r = c1.get(0, "x") + assert r.status_code == 200, f"expected 200 for get, got {r.status_code}" + result = r.json()["value"] + + r = c1.get(1, "x") + assert r.status_code == 200, f"expected 200 for get, got {r.status_code}" + assert r.json()["value"] == result, f"Results are not consistent. Expected { + result + }, got {r.json()}(Comparison on value)" + + r = c1.get(2, "x") + assert r.status_code == 200, f"expected 200 for get, got {r.status_code}" + assert r.json()["value"] == result, f"Results are not consistent. Expected { + result + }, got {r.json()} (Comparison on value)" + + # return score/reason + return True, "ok" + +# basic +def convergence_get_all_1(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=2) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + + log("\n> TEST CONVERGENCE GETALL 1") + + # starts off with no partitions + conductor.my_partition([0], "part1") + conductor.my_partition([1], "part2") + # conductor.my_partition([0,1], "part1") + log("Created initial partitions: {0}, {1}") + + # describe the new network topology + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + r = alice.put(0, "x", "alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = alice.put(1, "z", "testing123") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = bob.put(1, "x", "bob") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = bob.put(1, "y", "builder") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # should hang here + r = bob.get_all(0) + assert r.status_code == 408, ( + f"expected 408 for get_all, got {r.status_code}" + ) + + # heal partition + conductor.my_partition([0, 1], "base") + log("Partition healed: {0, 1}") + time.sleep(10) + # sleep to allow for convergence + + r = bob.get_all(0) + assert r.ok, f"expected ok for get_all, got {r.status_code}" + KVS_state_0 = r.json()["items"] + + r = bob.get_all(1) + assert r.ok, f"expected ok for get_all, got {r.status_code}" + KVS_state_1 = r.json()["items"] + + assert KVS_state_0 == KVS_state_1, ( + f"KVS states are not equal to Bob: {KVS_state_0}, {KVS_state_1}" + ) + + r = alice.get_all(1) + assert r.ok, f"expected ok for get_all, got {r.status_code}" + KVS_state_2 = r.json()["items"] + + r = alice.get_all(0) + assert r.ok, f"expected ok for get_all, got {r.status_code}" + KVS_state_3 = r.json()["items"] + + assert KVS_state_0 == KVS_state_1 == KVS_state_2 == KVS_state_3, ( + f"KVS states are not equal to Alice: {KVS_state_0}, {KVS_state_1}, {KVS_state_2}, {KVS_state_3}" + ) + + return True, "ok" + +# advanced +def convergence_get_all_2(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1, 2])) + fx.broadcast_view(conductor.get_shard_view()) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + carol = KVSMultiClient(fx.clients, "carol", log) + + c1 = KVSMultiClient(fx.clients, "c1", log) + c2 = KVSMultiClient(fx.clients, "c2", log) + c3 = KVSMultiClient(fx.clients, "c3", log) + + log("\n> TEST CONVERGENCE GET ALL 2") + + # partition 0, 1, 2 + conductor.create_partition([0], "p0") + conductor.create_partition([1], "p1") + conductor.create_partition([2], "p2") + + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # alice writes x, y, z to server 0 + r = alice.put(0, "x", "alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = alice.put(0, "y", "alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = alice.put(0, "z", "alice") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # bob writes x,y,z to server 1 + r = bob.put(1, "x", "bob") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = bob.put(1, "y", "bob") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = bob.put(1, "z", "bob") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + # carol writes x, y, z to server 2 + r = carol.put(2, "x", "carol") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = carol.put(2, "y", "carol") + assert r.ok, f"expected ok for new key, got {r.status_code}" + r = carol.put(2, "z", "carol") + assert r.ok, f"expected ok for new key, got {r.status_code}" + + r = c1.get_all(0) + assert r.status_code == 200, f"expected 200 for get_all, got {r.status_code}" + assert r.json()["items"] == {"x": "alice", "y": "alice", "z": "alice"}, ( + f"expected {{'x': 'alice', 'y': 'alice', 'z': 'alice'}}, got {r.json()}" + ) + + r = c2.get_all(1) + assert r.status_code == 200, f"expected 200 for get_all, got {r.status_code}" + assert r.json()["items"] == {"x": "bob", "y": "bob", "z": "bob"}, ( + f"expected {{'x': 'bob', 'y': 'bob', 'z': 'bob'}}, got {r.json()}" + ) + + r = c3.get_all(2) + assert r.status_code == 200, f"expected 200 for get_all, got {r.status_code}" + assert r.json()["items"] == {"x": "carol", "y": "carol", "z": "carol"}, ( + f"expected {{'x': 'carol', 'y': 'carol', 'z': 'carol'}}, got {r.json()}" + ) + + conductor.create_partition([0, 1, 2], "base") + log("created new partition and waiting for sync...") + time.sleep(10) + + r1 = c1.get_all(0) + assert r.status_code == 200, f"expected 200 for get_all, got {r.status_code}" + r2 = c2.get_all(1) + assert r.status_code == 200, f"expected 200 for get_all, got {r.status_code}" + r3 = c3.get_all(2) + assert r.status_code == 200, f"expected 200 for get_all, got {r.status_code}" + assert ( + r1.json()["items"] == r2.json()["items"] == r3.json()["items"] + ), f"expected all responses to be equal, got \n {r1.json()['items']}, \n{ + r2.json()['items'] + }, \n{r3.json()['items']}" + return True, "ok" + + + + +CONVERGENCE_TESTS = [ + TestCase("convergence_concurrent_basic_1", convergence_concurrent_basic_1), + TestCase("convergence_happens_before_basic_1", convergence_happens_before_basic_1), + TestCase("convergence_get_all_1", convergence_get_all_1), + TestCase("convergence_get_all_2", convergence_get_all_2), +] diff --git a/tests/asgn3/eventual_consistency/lamport-convergence-concurrent-basic-1.png b/tests/asgn3/eventual_consistency/lamport-convergence-concurrent-basic-1.png new file mode 100644 index 0000000000000000000000000000000000000000..6874c310b67dcad0c554bd7306b667702d76d201 Binary files /dev/null and b/tests/asgn3/eventual_consistency/lamport-convergence-concurrent-basic-1.png differ diff --git a/tests/asgn3/eventual_consistency/lamport-convergence-get-all.jpg b/tests/asgn3/eventual_consistency/lamport-convergence-get-all.jpg new file mode 100644 index 0000000000000000000000000000000000000000..af362172ab282a6ebdd310d8e51577a54fe7429c Binary files /dev/null and b/tests/asgn3/eventual_consistency/lamport-convergence-get-all.jpg differ diff --git a/tests/asgn3/eventual_consistency/lamport-convergence-get-all1.jpg b/tests/asgn3/eventual_consistency/lamport-convergence-get-all1.jpg new file mode 100644 index 0000000000000000000000000000000000000000..0d7dd17550b08070cb87ef6ba3422f7a8bca2617 Binary files /dev/null and b/tests/asgn3/eventual_consistency/lamport-convergence-get-all1.jpg differ diff --git a/tests/asgn3/eventual_consistency/lamport-convergence-happens-before-basic-1.png b/tests/asgn3/eventual_consistency/lamport-convergence-happens-before-basic-1.png new file mode 100644 index 0000000000000000000000000000000000000000..c2984751cc44ec862ce95adc7ee99201b639f38d Binary files /dev/null and b/tests/asgn3/eventual_consistency/lamport-convergence-happens-before-basic-1.png differ diff --git a/tests/asgn3/hello.py b/tests/asgn3/hello.py new file mode 100644 index 0000000000000000000000000000000000000000..f1b07fe0d728e248e184921d2226bf077ae892bd --- /dev/null +++ b/tests/asgn3/hello.py @@ -0,0 +1,45 @@ +from typing import List, Dict, Any, Optional +import requests + +from ...utils.containers import ClusterConductor +from ...utils.util import Logger +from ...utils.kvs_api import KVSClient + + +def hello_cluster(conductor: ClusterConductor, dir, log: Logger): + # create a cluster + log("\n> SPAWN CLUSTER") + conductor.spawn_cluster(node_count=2) + conductor.add_shard("shard1", conductor.get_nodes([0, 1])) + + # by default, all nodes are in the same partition, on the base network + # let's create two partitions, one with node 0 and one with node 1 + log("\n> CREATE PARTITIONS") + conductor.create_partition(node_ids=[0], partition_id="p0") + conductor.create_partition(node_ids=[1], partition_id="p1") + + # describe cluster + log("\n> DESCRIBE CLUSTER") + conductor.describe_cluster() + + # talk to node 0 in the cluster + log("\n> TALK TO NODE 0") + n0_ep = conductor.node_external_endpoint(0) + n0_client = KVSClient(n0_ep) + n0_client.ping().raise_for_status() + log(f" - node 0 is up at {n0_ep}") + + # talk to node 1 in the cluster + log("\n> TALK TO NODE 1") + n1_ep = conductor.node_external_endpoint(1) + n1_client = KVSClient(n1_ep) + n1_client.ping().raise_for_status() + log(f" - node 1 is up at {n1_ep}") + + conductor.dump_all_container_logs(dir) + # clean up + log("\n> DESTROY CLUSTER") + conductor.destroy_cluster() + + # return score/reason + return True, "ok" diff --git a/tests/asgn3/helper.py b/tests/asgn3/helper.py new file mode 100644 index 0000000000000000000000000000000000000000..f4271a5ceec8fa1732484d8355368bc99c5f0f61 --- /dev/null +++ b/tests/asgn3/helper.py @@ -0,0 +1,120 @@ +from typing import List, Dict, Any +from ..utils.kvs_api import DEFAULT_TIMEOUT + +from ..utils.containers import ClusterConductor +from ..utils.kvs_api import KVSClient +from ..utils.util import Logger + + +class KVSTestFixture: # conductor: ClusterConductor node_count: int + # clients: List[KVSClient] + + def __init__(self, conductor: ClusterConductor, dir, log: Logger, node_count: int): + self.conductor = conductor + self.dir = dir + self.node_count = node_count + self.clients = [] + self.log = log + + def spawn_cluster(self): + self.log("\n> SPAWN CLUSTER") + self.conductor.spawn_cluster(node_count=self.node_count) + + for i in range(self.node_count): + ep = self.conductor.node_external_endpoint(i) + self.clients.append(KVSClient(ep)) + + r = self.clients[i].ping() + assert r.status_code == 200, f"expected 200 for ping, got { + r.status_code}" + self.log(f" - node {i} is up: {r.text}") + + def broadcast_view(self, view: List[Dict[str, Any]]): + self.log(f"\n> SEND VIEW: {view}") + for i, client in enumerate(self.clients): + r = client.send_view(view) + assert ( + r.status_code == 200 + ), f"expected 200 to ack view, got {r.status_code}" + self.log(f"view sent to node {i}: {r.status_code} {r.text}") + + def send_view(self, node_id: int, view: List[Dict[str, Any]]): + r = self.clients[node_id].send_view(view) + assert r.status_code == 200, f"expected 200 to ack view, got { + r.status_code}" + self.log(f"view sent to node {node_id}: {r.status_code} {r.text}") + + def destroy_cluster(self): + self.conductor.dump_all_container_logs(self.dir) + self.log("\n> DESTROY CLUSTER") + self.conductor.destroy_cluster() + + def __enter__(self): + self.spawn_cluster() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.destroy_cluster() + + +class KVSMultiClient: + def __init__(self, clients: List[KVSClient], name: str, log: Logger): + self.clients = clients + self.metadata = None + self.name = name + self.req = 0 + self.log = log + + # internal model of kvs + self._kvs_model = {} + + def reset_model(self): + self._kvs_model = {} + self.metadata = None + + def put(self, node_id: int, key: str, value: str, timeout: float = DEFAULT_TIMEOUT): + self.log(f" {self.name} req_id:{self.req} > { + node_id} > kvs.put {key} <- {value}") + + r = self.clients[node_id].put( + key, value, self.metadata, timeout=timeout) + + # update model if successful + if r.status_code // 100 == 2: + self._kvs_model[key] = value + self.log(f" {self.name} req_id:{self.req} {r.json()}") + self.metadata = r.json()["causal-metadata"] + + self.req += 1 + return r + + def get(self, node_id: int, key: str, timeout: float = DEFAULT_TIMEOUT): + self.log(f" {self.name} req_id:{self.req} > {node_id}> kvs.get { + key} request \"causal-metadata\": {self.metadata}") + r = self.clients[node_id].get(key, self.metadata, timeout=timeout) + + if r.status_code // 100 == 2: + self.log(f" {self.name} req_id:{self.req} > { + node_id}> kvs.get {key} -> {r.json()}") + self.metadata = r.json()["causal-metadata"] + else: + self.log(f" {self.name} req_id:{self.req} > { + node_id}> kvs.get {key} -> HTTP ERROR {r.status_code}") + + self.req += 1 + return r + + def get_all(self, node_id: int, timeout: float = DEFAULT_TIMEOUT): + self.log(f" {self.name} req_id:{self.req} > { + node_id}> kvs.get_all request \"causal-metadata\": {self.metadata}") + r = self.clients[node_id].get_all(self.metadata, timeout=timeout) + if r.status_code // 100 == 2: + self.log(f" {self.name} req_id:{self.req} > { + node_id}> kvs.get_all -> {r.json()}") + self.metadata = r.json()["causal-metadata"] + else: + self.log(f" {self.name} req_id:{self.req} > { + node_id}> kvs.get_all -> HTTP ERROR {r.status_code}") + + self.req += 1 + return r diff --git a/tests/asgn3/view_change/README.md b/tests/asgn3/view_change/README.md new file mode 100644 index 0000000000000000000000000000000000000000..ecd2eca4b5da85fd7d2c9281965f9c40a8435b2f --- /dev/null +++ b/tests/asgn3/view_change/README.md @@ -0,0 +1,27 @@ +# View Change + +## View Change Basic + +### View Change Basic 1 + +#### Test Description +Tests for consistency in view changes, when dropping a node from the view and then adding a new one to the view. This test gives the opportunity for nodes to converge with 10 second sleeps, so all servers *should* be consistent before any view changes occur. + +1. Send view containing A, B to nodes A and B +2. c1.PUT(A, a=1) +3. c2.PUT(B, a=2) +4. c1.PUT(B, b=3) +5. c2.PUT(A, c=4) +6. sleep(10), allows convergence for A and B +7. Drop B from the view +8. Add C to the view, update A and C with new view +9. c1.PUT(A, a=5) +10. c2.PUT(C, e=6) +11. sleep(10), allows convergence for A and C +12. c1.GETALL(A) == c2.GETALL(C) + +#### Explicit violation +if any writes fail or if the KVS states don't match at the end. + +#### Lamport Diagram + \ No newline at end of file diff --git a/tests/asgn3/view_change/lamport-view-change-basic-1.jpg b/tests/asgn3/view_change/lamport-view-change-basic-1.jpg new file mode 100644 index 0000000000000000000000000000000000000000..f3b24cb34d605ed92327f7b8b9efedeb60cd9d30 Binary files /dev/null and b/tests/asgn3/view_change/lamport-view-change-basic-1.jpg differ diff --git a/tests/asgn3/view_change/view_change_basic.py b/tests/asgn3/view_change/view_change_basic.py new file mode 100644 index 0000000000000000000000000000000000000000..c29aff69480c0297bb589bbf0a9fe58fdbb7977e --- /dev/null +++ b/tests/asgn3/view_change/view_change_basic.py @@ -0,0 +1,78 @@ +from time import sleep + +from ....utils.containers import ClusterConductor +from ....utils.testcase import TestCase +from ....utils.util import Logger +from ...helper import KVSMultiClient, KVSTestFixture + +# starts off with 2 nodes in a partition. Nodes will be given writes +# after enough time is given for parity, drop one node from the view +# and then add a new node to the view. +def view_change_basic(conductor: ClusterConductor, dir, log: Logger): + with KVSTestFixture(conductor, dir, log, node_count=3) as fx: + conductor.add_shard("shard1", conductor.get_nodes([0, 1, 2])) + alice = KVSMultiClient(fx.clients, "alice", log) + bob = KVSMultiClient(fx.clients, "bob", log) + + log("\n> TEST VIEW CHANGE BASIC 1") + + # create a view for only [0,1] partition, send to 0,1 + conductor.create_partition([0, 1], "p0") + conductor.create_partition([2], "p1") + fx.send_view(0, conductor.get_partition_view("p0")) + fx.send_view(1, conductor.get_partition_view("p0")) + + log("\n> NETWORK TOPOLOGY") + conductor.describe_cluster() + + # send some writes to main view + r = alice.put(0, "a", "1") + assert r.ok, f"expected 200 for put, got {r.status_code}" + + r = bob.put(1, "a", "2") + assert r.ok, f"expected 200 for get, got {r.status_code}" + + r = alice.put(1, "b", "3") + assert r.ok, f"expected 200 for put, got {r.status_code}" + + r = bob.put(0, "c", "4") + assert r.ok, f"expected 200 for get, got {r.status_code}" + + # wait for convergence + sleep(10) + + # drop node 1 from the view + conductor.create_partition([0], "p2") + conductor.create_partition([1], "p3") # isolate this + fx.send_view(0, conductor.get_partition_view("p0")) + + # add node 2 to the view + conductor.create_partition([0, 2], "p4") + fx.send_view(0, conductor.get_partition_view("p4")) + fx.send_view(2, conductor.get_partition_view("p4")) + + log("\n> NETWORK TOPOLOGY after view changes") + conductor.describe_cluster() + + r = alice.put(0, "a", "5") + assert r.ok, f"expected 200 for put, got {r.status_code}" + + r = bob.put(2, "e", "6") + assert r.ok, f"expected 200 for put, got {r.status_code}" + + # wait for convergence + sleep(10) + + KVS_state_0 = alice.get_all(0) + assert KVS_state_0.ok, f"expected 200 for get, got {KVS_state_0.status_code}" + + KVS_state_1 = bob.get_all(2) + assert KVS_state_1.ok, f"expected 200 for get, got {KVS_state_1.status_code}" + + assert KVS_state_0.json()["items"] == KVS_state_1.json()["items"], f"expected KVS states to be equal, got {KVS_state_0.json()} and {KVS_state_1.json()}" + + return True, "ok" + +VIEW_CHANGE_TESTS = [ + TestCase("view_change_basic", view_change_basic), +] \ No newline at end of file