Skip to content
Snippets Groups Projects
Commit 916a01e0 authored by zphrs's avatar zphrs
Browse files

fixed issue with not sending metadata with None timeout

parent d7a0e6a1
No related branches found
No related tags found
No related merge requests found
......@@ -439,8 +439,8 @@ class ClusterConductor:
node.index
].base_url = self.node_external_endpoint(node.index)
view_changed = True
# if view_changed and hasattr(self, "_parent"):
# self._parent.rebroadcast_view(self.get_shard_view())
if view_changed and hasattr(self, "_parent"):
self._parent.rebroadcast_view(self.get_shard_view())
def create_partition(self, node_ids: List[int], partition_id: str) -> None:
net_name = f"kvs_{self.group_id}_net_{partition_id}"
......@@ -505,8 +505,8 @@ class ClusterConductor:
node.index
].base_url = self.node_external_endpoint(node.index)
view_changed = True
# if view_changed and hasattr(self, "_parent"):
# self._parent.rebroadcast_view(self.get_shard_view())
if view_changed and hasattr(self, "_parent"):
self._parent.rebroadcast_view(self.get_shard_view())
DeprecationWarning("View is in updated format")
......
......@@ -110,7 +110,7 @@ class KVSClient:
r.status_code = REQUEST_TIMEOUT_STATUS_CODE
return r
else:
return requests.get(f"{self.base_url}/data")
return requests.get(f"{self.base_url}/data", json=create_json(metadata))
def clear(self, timeout: float = DEFAULT_TIMEOUT) -> None:
response = self.get_all(timeout=timeout)
......@@ -134,21 +134,29 @@ class KVSClient:
request_body = {"view": view}
return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout)
async def async_send_view(self, view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> aiohttp.ClientResponse:
async def async_send_view(
self, view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT
) -> aiohttp.ClientResponse:
if not isinstance(view, dict):
raise ValueError("view must be a dict")
self.last_view = view
request_body = {"view": view}
async with aiohttp.ClientSession() as session:
async with session.put(f"{self.base_url}/view", json=request_body, timeout=timeout) as response:
async with session.put(
f"{self.base_url}/view", json=request_body, timeout=timeout
) as response:
return response
if response.status != 200:
raise RuntimeError(f"failed to send view: {response.status}")
return response
def resend_last_view_with_ips_from_new_view(self, current_view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> requests.Response:
def resend_last_view_with_ips_from_new_view(
self,
current_view: dict[str, List[Dict[str, Any]]],
timeout: float = DEFAULT_TIMEOUT,
) -> requests.Response:
if not isinstance(current_view, dict):
raise ValueError("view must be a dict")
if not hasattr(self, "last_view"):
......@@ -158,12 +166,11 @@ class KVSClient:
for shard_key in current_view:
for node in current_view[shard_key]:
flattened_current_view[node["id"]] = node["address"]
for shard_key in self.last_view:
for node in self.last_view[shard_key]:
node['address'] = flattened_current_view[node["id"]]
node["address"] = flattened_current_view[node["id"]]
request_body = {"view": self.last_view}
print(f"Sending new view: {self.last_view}")
return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment