|
1 |
| -# The MIT License (MIT) |
2 |
| -# Copyright © 2024 Opentensor Foundation |
3 |
| -# |
4 |
| -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated |
5 |
| -# documentation files (the “Software”), to deal in the Software without restriction, including without limitation |
6 |
| -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, |
7 |
| -# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: |
8 |
| -# |
9 |
| -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of |
10 |
| -# the Software. |
11 |
| -# |
12 |
| -# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO |
13 |
| -# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL |
14 |
| -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
15 |
| -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
16 |
| -# DEALINGS IN THE SOFTWARE. |
17 |
| - |
18 | 1 | """Utils for handling local network with ip and ports."""
|
19 | 2 |
|
20 | 3 | import json
|
|
26 | 9 |
|
27 | 10 | import netaddr
|
28 | 11 | import requests
|
| 12 | +from retry import retry |
| 13 | +from websocket import WebSocketConnectionClosedException |
29 | 14 |
|
30 | 15 | from bittensor.utils.btlogging import logging
|
31 | 16 |
|
@@ -178,22 +163,49 @@ def get_formatted_ws_endpoint_url(endpoint_url: Optional[str]) -> Optional[str]:
|
178 | 163 | def ensure_connected(func):
|
179 | 164 | """Decorator ensuring the function executes with an active substrate connection."""
|
180 | 165 |
|
| 166 | + def is_connected(substrate) -> bool: |
| 167 | + """Check if the substrate connection is active.""" |
| 168 | + sock = substrate.websocket.sock |
| 169 | + return ( |
| 170 | + sock is not None |
| 171 | + and sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) == 0 |
| 172 | + ) |
| 173 | + |
| 174 | + @retry( |
| 175 | + exceptions=ConnectionRefusedError, |
| 176 | + tries=5, |
| 177 | + delay=5, |
| 178 | + backoff=1, |
| 179 | + logger=logging, |
| 180 | + ) |
| 181 | + def reconnect_with_retries(self): |
| 182 | + """Attempt to reconnect with retries using retry library.""" |
| 183 | + logging.info("Attempting to reconnect to substrate...") |
| 184 | + self._get_substrate() |
| 185 | + |
| 186 | + old_level = logging.get_level() |
| 187 | + logging.set_info() |
| 188 | + logging.success("Connection successfully restored!") |
| 189 | + logging.setLevel(old_level) |
| 190 | + |
181 | 191 | @wraps(func)
|
182 | 192 | def wrapper(self, *args, **kwargs):
|
183 |
| - """Wrapper function where `self` argument is Subtensor instance with the substrate connection.""" |
184 |
| - # Check the socket state before method execution |
185 |
| - if ( |
186 |
| - # connection was closed correctly |
187 |
| - self.substrate.websocket.sock is None |
188 |
| - # connection has a broken pipe |
189 |
| - or self.substrate.websocket.sock.getsockopt( |
190 |
| - socket.SOL_SOCKET, socket.SO_ERROR |
191 |
| - ) |
192 |
| - != 0 |
193 |
| - ): |
194 |
| - logging.debug("Reconnecting to substrate...") |
| 193 | + """Wrapper function where `self` is expected to be a Subtensor instance.""" |
| 194 | + if not is_connected(self.substrate): |
| 195 | + logging.debug("Substrate connection inactive. Attempting to reconnect...") |
195 | 196 | self._get_substrate()
|
196 |
| - # Execute the method if the connection is active or after reconnecting |
197 |
| - return func(self, *args, **kwargs) |
| 197 | + |
| 198 | + try: |
| 199 | + return func(self, *args, **kwargs) |
| 200 | + except WebSocketConnectionClosedException: |
| 201 | + logging.warning( |
| 202 | + "WebSocket connection closed. Attempting to reconnect 5 times..." |
| 203 | + ) |
| 204 | + try: |
| 205 | + reconnect_with_retries(self) |
| 206 | + return func(self, *args, **kwargs) |
| 207 | + except ConnectionRefusedError: |
| 208 | + logging.error("Unable to restore connection. Raising exception.") |
| 209 | + raise ConnectionRefusedError("Failed to reconnect to substrate.") |
198 | 210 |
|
199 | 211 | return wrapper
|
0 commit comments