From 916a01e05945a267bf8640217b4c7f59c454d50a Mon Sep 17 00:00:00 2001 From: zphrs <z@zephiris.dev> Date: Fri, 14 Mar 2025 21:23:34 -0700 Subject: [PATCH] fixed issue with not sending metadata with None timeout --- utils/containers.py | 8 ++++---- utils/kvs_api.py | 25 ++++++++++++++++--------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/utils/containers.py b/utils/containers.py index efc6ebf..16903ba 100644 --- a/utils/containers.py +++ b/utils/containers.py @@ -439,8 +439,8 @@ class ClusterConductor: node.index ].base_url = self.node_external_endpoint(node.index) view_changed = True - # if view_changed and hasattr(self, "_parent"): - # self._parent.rebroadcast_view(self.get_shard_view()) + if view_changed and hasattr(self, "_parent"): + self._parent.rebroadcast_view(self.get_shard_view()) def create_partition(self, node_ids: List[int], partition_id: str) -> None: net_name = f"kvs_{self.group_id}_net_{partition_id}" @@ -505,8 +505,8 @@ class ClusterConductor: node.index ].base_url = self.node_external_endpoint(node.index) view_changed = True - # if view_changed and hasattr(self, "_parent"): - # self._parent.rebroadcast_view(self.get_shard_view()) + if view_changed and hasattr(self, "_parent"): + self._parent.rebroadcast_view(self.get_shard_view()) DeprecationWarning("View is in updated format") diff --git a/utils/kvs_api.py b/utils/kvs_api.py index c630a50..c70efa5 100644 --- a/utils/kvs_api.py +++ b/utils/kvs_api.py @@ -110,7 +110,7 @@ class KVSClient: r.status_code = REQUEST_TIMEOUT_STATUS_CODE return r else: - return requests.get(f"{self.base_url}/data") + return requests.get(f"{self.base_url}/data", json=create_json(metadata)) def clear(self, timeout: float = DEFAULT_TIMEOUT) -> None: response = self.get_all(timeout=timeout) @@ -134,21 +134,29 @@ class KVSClient: request_body = {"view": view} return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout) - async def async_send_view(self, view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> aiohttp.ClientResponse: + async def async_send_view( + self, view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT + ) -> aiohttp.ClientResponse: if not isinstance(view, dict): raise ValueError("view must be a dict") - + self.last_view = view request_body = {"view": view} async with aiohttp.ClientSession() as session: - async with session.put(f"{self.base_url}/view", json=request_body, timeout=timeout) as response: + async with session.put( + f"{self.base_url}/view", json=request_body, timeout=timeout + ) as response: return response if response.status != 200: raise RuntimeError(f"failed to send view: {response.status}") return response - def resend_last_view_with_ips_from_new_view(self, current_view: dict[str, List[Dict[str, Any]]], timeout: float = DEFAULT_TIMEOUT) -> requests.Response: + def resend_last_view_with_ips_from_new_view( + self, + current_view: dict[str, List[Dict[str, Any]]], + timeout: float = DEFAULT_TIMEOUT, + ) -> requests.Response: if not isinstance(current_view, dict): raise ValueError("view must be a dict") if not hasattr(self, "last_view"): @@ -158,12 +166,11 @@ class KVSClient: for shard_key in current_view: for node in current_view[shard_key]: flattened_current_view[node["id"]] = node["address"] - + for shard_key in self.last_view: for node in self.last_view[shard_key]: - node['address'] = flattened_current_view[node["id"]] - + node["address"] = flattened_current_view[node["id"]] + request_body = {"view": self.last_view} print(f"Sending new view: {self.last_view}") return requests.put(f"{self.base_url}/view", json=request_body, timeout=timeout) - \ No newline at end of file -- GitLab