From 515ae58f45d623991c139ac5a845a62a47eef06c Mon Sep 17 00:00:00 2001 From: Justin Bull Date: Mon, 4 Dec 2023 21:03:42 -0500 Subject: [PATCH 1/7] [CHANGE] Use context manager syntax for Lock This is a safer and more readable way to prevent accidental missed release() calls on logical branching of code/execution flow. Functions that may not be readily converted to this format have been tagged as TODOs for potential future investigation after refactoring. --- pyVoIP/RTP.py | 10 ++--- pyVoIP/SIP.py | 98 ++++++++++++++++++++++----------------------- pyVoIP/VoIP/VoIP.py | 25 +++++------- 3 files changed, 64 insertions(+), 69 deletions(-) diff --git a/pyVoIP/RTP.py b/pyVoIP/RTP.py index ac329d1..412a68c 100644 --- a/pyVoIP/RTP.py +++ b/pyVoIP/RTP.py @@ -170,11 +170,10 @@ def read(self, length: int = 160) -> bytes: # This acts functionally as a lock while the buffer is being rebuilt. while self.rebuilding: time.sleep(0.01) - self.bufferLock.acquire() - packet = self.buffer.read(length) - if len(packet) < length: - packet = packet + (b"\x80" * (length - len(packet))) - self.bufferLock.release() + with self.bufferLock: + packet = self.buffer.read(length) + if len(packet) < length: + packet = packet + (b"\x80" * (length - len(packet))) return packet def rebuild(self, reset: bool, offset: int = 0, data: bytes = b"") -> None: @@ -192,6 +191,7 @@ def rebuild(self, reset: bool, offset: int = 0, data: bytes = b"") -> None: self.rebuilding = False def write(self, offset: int, data: bytes) -> None: + # TODO: Can this safely be changed to use context manager syntax? self.bufferLock.acquire() self.log[offset] = data bufferloc = self.buffer.tell() diff --git a/pyVoIP/SIP.py b/pyVoIP/SIP.py index 6e48631..4ec8a6c 100644 --- a/pyVoIP/SIP.py +++ b/pyVoIP/SIP.py @@ -844,6 +844,8 @@ def __init__( def recv(self) -> None: while self.NSD: + # TODO: Can this safely be changed to use context manager syntax + # or does the BlockingIOError handler ruin that? self.recvLock.acquire() self.s.setblocking(False) try: @@ -1595,52 +1597,49 @@ def invite( invite = self.genInvite( number, str(sess_id), ms, sendtype, branch, call_id ) - self.recvLock.acquire() - self.out.sendto(invite.encode("utf8"), (self.server, self.port)) - debug("Invited") - response = SIPMessage(self.s.recv(8192)) - - while ( - response.status != SIPStatus(401) - and response.status != SIPStatus(100) - and response.status != SIPStatus(180) - ) or response.headers["Call-ID"] != call_id: - if not self.NSD: - break - self.parseMessage(response) + with self.recvLock: + self.out.sendto(invite.encode("utf8"), (self.server, self.port)) + debug("Invited") response = SIPMessage(self.s.recv(8192)) - if response.status == SIPStatus(100) or response.status == SIPStatus( - 180 - ): - self.recvLock.release() - return SIPMessage(invite.encode("utf8")), call_id, sess_id - debug(f"Received Response: {response.summary()}") - ack = self.genAck(response) - self.out.sendto(ack.encode("utf8"), (self.server, self.port)) - debug("Acknowledged") - authhash = self.genAuthorization(response) - nonce = response.authentication["nonce"] - realm = response.authentication["realm"] - auth = ( - f'Authorization: Digest username="{self.username}",realm=' - + f'"{realm}",nonce="{nonce}",uri="sip:{self.server};' - + f'transport=UDP",response="{str(authhash, "utf8")}",' - + "algorithm=MD5\r\n" - ) - - invite = self.genInvite( - number, str(sess_id), ms, sendtype, branch, call_id - ) - invite = invite.replace( - "\r\nContent-Length", f"\r\n{auth}Content-Length" - ) + while ( + response.status != SIPStatus(401) + and response.status != SIPStatus(100) + and response.status != SIPStatus(180) + ) or response.headers["Call-ID"] != call_id: + if not self.NSD: + break + self.parseMessage(response) + response = SIPMessage(self.s.recv(8192)) + + if response.status == SIPStatus( + 100 + ) or response.status == SIPStatus(180): + return SIPMessage(invite.encode("utf8")), call_id, sess_id + debug(f"Received Response: {response.summary()}") + ack = self.genAck(response) + self.out.sendto(ack.encode("utf8"), (self.server, self.port)) + debug("Acknowledged") + authhash = self.genAuthorization(response) + nonce = response.authentication["nonce"] + realm = response.authentication["realm"] + auth = ( + f'Authorization: Digest username="{self.username}",realm=' + + f'"{realm}",nonce="{nonce}",uri="sip:{self.server};' + + f'transport=UDP",response="{str(authhash, "utf8")}",' + + "algorithm=MD5\r\n" + ) - self.out.sendto(invite.encode("utf8"), (self.server, self.port)) + invite = self.genInvite( + number, str(sess_id), ms, sendtype, branch, call_id + ) + invite = invite.replace( + "\r\nContent-Length", f"\r\n{auth}Content-Length" + ) - self.recvLock.release() + self.out.sendto(invite.encode("utf8"), (self.server, self.port)) - return SIPMessage(invite.encode("utf8")), call_id, sess_id + return SIPMessage(invite.encode("utf8")), call_id, sess_id def bye(self, request: SIPMessage) -> None: message = self.genBye(request) @@ -1664,6 +1663,8 @@ def deregister(self) -> bool: return False def __deregister(self) -> bool: + # TODO: Can this safely be changed to use context manager syntax or + # does the 5s recursive deregister() ruin that? self.recvLock.acquire() self.phone._status = PhoneStatus.DEREGISTERING firstRequest = self.genFirstRequest(deregister=True) @@ -1763,6 +1764,8 @@ def __start_register_timer(self, delay: Optional[int] = None): self.registerThread.start() def __register(self) -> bool: + # TODO: Can this safely be changed to use context manager syntax or + # does the 5s recursive register() ruin that? self.recvLock.acquire() self.phone._status = PhoneStatus.REGISTERING firstRequest = self.genFirstRequest() @@ -1872,16 +1875,13 @@ def _handle_bad_request(self) -> None: def subscribe(self, lastresponse: SIPMessage) -> None: # TODO: check if needed and maybe implement fully - self.recvLock.acquire() - - subRequest = self.genSubscribe(lastresponse) - self.out.sendto(subRequest.encode("utf8"), (self.server, self.port)) - - response = SIPMessage(self.s.recv(8192)) + with self.recvLock: + subRequest = self.genSubscribe(lastresponse) + self.out.sendto(subRequest.encode("utf8"), (self.server, self.port)) - debug(f'Got response to subscribe: {str(response.heading, "utf8")}') + response = SIPMessage(self.s.recv(8192)) - self.recvLock.release() + debug(f'Got response to subscribe: {str(response.heading, "utf8")}') def trying_timeout_check(self, response: SIPMessage) -> SIPMessage: """ diff --git a/pyVoIP/VoIP/VoIP.py b/pyVoIP/VoIP/VoIP.py index 6174e98..f6132ad 100644 --- a/pyVoIP/VoIP/VoIP.py +++ b/pyVoIP/VoIP/VoIP.py @@ -235,12 +235,11 @@ def __del__(self): self.phone.release_ports(call=self) def dtmf_callback(self, code: str) -> None: - self.dtmfLock.acquire() - bufferloc = self.dtmf.tell() - self.dtmf.seek(0, 2) - self.dtmf.write(code) - self.dtmf.seek(bufferloc, 0) - self.dtmfLock.release() + with self.dtmfLock: + bufferloc = self.dtmf.tell() + self.dtmf.seek(0, 2) + self.dtmf.write(code) + self.dtmf.seek(bufferloc, 0) def getDTMF(self, length=1) -> str: warnings.warn( @@ -252,10 +251,9 @@ def getDTMF(self, length=1) -> str: return self.get_dtmf(length) def get_dtmf(self, length=1) -> str: - self.dtmfLock.acquire() - packet = self.dtmf.read(length) - self.dtmfLock.release() - return packet + with self.dtmfLock: + packet = self.dtmf.read(length) + return packet def genMs(self) -> Dict[int, Dict[int, RTP.PayloadType]]: warnings.warn( @@ -737,9 +735,8 @@ def request_port(self, blocking=True) -> int: return selection def release_ports(self, call: Optional[VoIPCall] = None) -> None: - self.portsLock.acquire() - self._cleanup_dead_calls() - try: + with self.portsLock: + self._cleanup_dead_calls() if isinstance(call, VoIPCall): ports = list(call.assignedPorts.keys()) else: @@ -753,8 +750,6 @@ def release_ports(self, call: Optional[VoIPCall] = None) -> None: for port in ports: self.assignedPorts.remove(port) - finally: - self.portsLock.release() def _cleanup_dead_calls(self) -> None: to_delete = [] From 90f9083c5c96f6636f76be1d47f1a029c8b7494f Mon Sep 17 00:00:00 2001 From: Justin Bull Date: Mon, 4 Dec 2023 21:31:15 -0500 Subject: [PATCH 2/7] [CHANGE] Use Lock context manager for __register() Accomplished by refactoring released lock with sleep and recursive call out of the middle of the function and instead raising an exception to the main calling register() function. This should be functionally equivalent whilst allowing for the context manager syntax, which is safer against missing release() calls. --- pyVoIP/SIP.py | 194 ++++++++++++++++++++++++++------------------------ 1 file changed, 100 insertions(+), 94 deletions(-) diff --git a/pyVoIP/SIP.py b/pyVoIP/SIP.py index 4ec8a6c..aa57b4f 100644 --- a/pyVoIP/SIP.py +++ b/pyVoIP/SIP.py @@ -40,6 +40,10 @@ class SIPParseError(Exception): pass +class RetryRequiredError(Exception): + pass + + class Counter: def __init__(self, start: int = 1): self.x = start @@ -1749,6 +1753,9 @@ def register(self) -> bool: self.stop() self.fatalCallback() return False + if isinstance(e, RetryRequiredError): + time.sleep(5) + return self.register() self.__start_register_timer(delay=0) def __start_register_timer(self, delay: Optional[int] = None): @@ -1764,107 +1771,102 @@ def __start_register_timer(self, delay: Optional[int] = None): self.registerThread.start() def __register(self) -> bool: - # TODO: Can this safely be changed to use context manager syntax or - # does the 5s recursive register() ruin that? - self.recvLock.acquire() - self.phone._status = PhoneStatus.REGISTERING - firstRequest = self.genFirstRequest() - self.out.sendto(firstRequest.encode("utf8"), (self.server, self.port)) - - self.out.setblocking(False) - - ready = select.select([self.out], [], [], self.register_timeout) - if ready[0]: - resp = self.s.recv(8192) - else: - self.recvLock.release() - raise TimeoutError("Registering on SIP Server timed out") - - response = SIPMessage(resp) - response = self.trying_timeout_check(response) - first_response = response - - if response.status == SIPStatus(400): - # Bad Request - # TODO: implement - # TODO: check if broken connection can be brought back - # with new urn:uuid or reply with expire 0 - self._handle_bad_request() - - if response.status == SIPStatus(401): - # Unauthorized, likely due to being password protected. - regRequest = self.genRegister(response) + with self.recvLock: + self.phone._status = PhoneStatus.REGISTERING + firstRequest = self.genFirstRequest() self.out.sendto( - regRequest.encode("utf8"), (self.server, self.port) + firstRequest.encode("utf8"), (self.server, self.port) ) - ready = select.select([self.s], [], [], self.register_timeout) + + self.out.setblocking(False) + + ready = select.select([self.out], [], [], self.register_timeout) if ready[0]: resp = self.s.recv(8192) - response = SIPMessage(resp) - response = self.trying_timeout_check(response) - if response.status == SIPStatus(401): - # At this point, it's reasonable to assume that - # this is caused by invalid credentials. - debug("=" * 50) - debug("Unauthorized, SIP Message Log:\n") - debug("SENT") - debug(firstRequest) - debug("\nRECEIVED") - debug(first_response.summary()) - debug("\nSENT (DO NOT SHARE THIS PACKET)") - debug(regRequest) - debug("\nRECEIVED") - debug(response.summary()) - debug("=" * 50) - self.recvLock.release() - raise InvalidAccountInfoError( - "Invalid Username or " - + "Password for SIP server " - + f"{self.server}:" - + f"{self.myPort}" - ) - elif response.status == SIPStatus(400): - # Bad Request - # TODO: implement - # TODO: check if broken connection can be brought back - # with new urn:uuid or reply with expire 0 - self._handle_bad_request() else: - self.recvLock.release() raise TimeoutError("Registering on SIP Server timed out") - if response.status == SIPStatus(407): - # Proxy Authentication Required - # TODO: implement - debug("Proxy auth required") - - # TODO: This must be done more reliable - if response.status not in [ - SIPStatus(400), - SIPStatus(401), - SIPStatus(407), - ]: - # Unauthorized - if response.status == SIPStatus(500): - self.recvLock.release() - time.sleep(5) - return self.register() - else: - # TODO: determine if needed here - self.parseMessage(response) + response = SIPMessage(resp) + response = self.trying_timeout_check(response) + first_response = response + + if response.status == SIPStatus(400): + # Bad Request + # TODO: implement + # TODO: check if broken connection can be brought back + # with new urn:uuid or reply with expire 0 + self._handle_bad_request() + + if response.status == SIPStatus(401): + # Unauthorized, likely due to being password protected. + regRequest = self.genRegister(response) + self.out.sendto( + regRequest.encode("utf8"), (self.server, self.port) + ) + ready = select.select([self.s], [], [], self.register_timeout) + if ready[0]: + resp = self.s.recv(8192) + response = SIPMessage(resp) + response = self.trying_timeout_check(response) + if response.status == SIPStatus(401): + # At this point, it's reasonable to assume that + # this is caused by invalid credentials. + debug("=" * 50) + debug("Unauthorized, SIP Message Log:\n") + debug("SENT") + debug(firstRequest) + debug("\nRECEIVED") + debug(first_response.summary()) + debug("\nSENT (DO NOT SHARE THIS PACKET)") + debug(regRequest) + debug("\nRECEIVED") + debug(response.summary()) + debug("=" * 50) + raise InvalidAccountInfoError( + "Invalid Username or " + + "Password for SIP server " + + f"{self.server}:" + + f"{self.myPort}" + ) + elif response.status == SIPStatus(400): + # Bad Request + # TODO: implement + # TODO: check if broken connection can be brought back + # with new urn:uuid or reply with expire 0 + self._handle_bad_request() + else: + raise TimeoutError("Registering on SIP Server timed out") + + if response.status == SIPStatus(407): + # Proxy Authentication Required + # TODO: implement + debug("Proxy auth required") + + # TODO: This must be done more reliable + if response.status not in [ + SIPStatus(400), + SIPStatus(401), + SIPStatus(407), + ]: + # Unauthorized + if response.status == SIPStatus(500): + # We raise so the calling function can sleep and try again + raise RetryRequiredError("Response SIP status of 500") + else: + # TODO: determine if needed here + self.parseMessage(response) - debug(response.summary()) - debug(response.raw) + debug(response.summary()) + debug(response.raw) - self.recvLock.release() - if response.status == SIPStatus.OK: - return True - else: - raise InvalidAccountInfoError( - "Invalid Username or Password for " - + f"SIP server {self.server}:" - + f"{self.myPort}" - ) + if response.status == SIPStatus.OK: + return True + else: + raise InvalidAccountInfoError( + "Invalid Username or Password for " + + f"SIP server {self.server}:" + + f"{self.myPort}" + ) def _handle_bad_request(self) -> None: # Bad Request @@ -1877,11 +1879,15 @@ def subscribe(self, lastresponse: SIPMessage) -> None: # TODO: check if needed and maybe implement fully with self.recvLock: subRequest = self.genSubscribe(lastresponse) - self.out.sendto(subRequest.encode("utf8"), (self.server, self.port)) + self.out.sendto( + subRequest.encode("utf8"), (self.server, self.port) + ) response = SIPMessage(self.s.recv(8192)) - debug(f'Got response to subscribe: {str(response.heading, "utf8")}') + debug( + f'Got response to subscribe: {str(response.heading, "utf8")}' + ) def trying_timeout_check(self, response: SIPMessage) -> SIPMessage: """ From 6962b067ad96ddd9ce1bf4c14f3dff6ff530d200 Mon Sep 17 00:00:00 2001 From: Justin Bull Date: Mon, 4 Dec 2023 21:38:57 -0500 Subject: [PATCH 3/7] [CHANGE] Use Lock context manager for __deregister() Accomplished by refactoring released lock with sleep and recursive call out of the middle of the function and instead raising an exception to the main calling deregister() function. This should be functionally equivalent whilst allowing for the context manager syntax, which is safer against missing release() calls. --- pyVoIP/SIP.py | 103 +++++++++++++++++++++++++------------------------- 1 file changed, 51 insertions(+), 52 deletions(-) diff --git a/pyVoIP/SIP.py b/pyVoIP/SIP.py index aa57b4f..9178d70 100644 --- a/pyVoIP/SIP.py +++ b/pyVoIP/SIP.py @@ -1662,71 +1662,70 @@ def deregister(self) -> bool: return deregistered except BaseException as e: debug(f"DEREGISTERATION ERROR: {e}") + # TODO: a maximum tries check should be implemented otherwise a + # RecursionError will throw + if isinstance(e, RetryRequiredError): + time.sleep(5) + return self.deregister() if type(e) is OSError: raise return False def __deregister(self) -> bool: - # TODO: Can this safely be changed to use context manager syntax or - # does the 5s recursive deregister() ruin that? - self.recvLock.acquire() - self.phone._status = PhoneStatus.DEREGISTERING - firstRequest = self.genFirstRequest(deregister=True) - self.out.sendto(firstRequest.encode("utf8"), (self.server, self.port)) - - self.out.setblocking(False) - - ready = select.select([self.out], [], [], self.register_timeout) - if ready[0]: - resp = self.s.recv(8192) - else: - self.recvLock.release() - raise TimeoutError("Deregistering on SIP Server timed out") - - response = SIPMessage(resp) - response = self.trying_timeout_check(response) - - if response.status == SIPStatus(401): - # Unauthorized, likely due to being password protected. - regRequest = self.genRegister(response, deregister=True) + with self.recvLock: + self.phone._status = PhoneStatus.DEREGISTERING + firstRequest = self.genFirstRequest(deregister=True) self.out.sendto( - regRequest.encode("utf8"), (self.server, self.port) + firstRequest.encode("utf8"), (self.server, self.port) ) - ready = select.select([self.s], [], [], self.register_timeout) + + self.out.setblocking(False) + + ready = select.select([self.out], [], [], self.register_timeout) if ready[0]: resp = self.s.recv(8192) - response = SIPMessage(resp) - if response.status == SIPStatus(401): - # At this point, it's reasonable to assume that - # this is caused by invalid credentials. - debug("Unauthorized") - self.recvLock.release() - raise InvalidAccountInfoError( - "Invalid Username or " - + "Password for SIP server " - + f"{self.server}:" - + f"{self.myPort}" - ) - elif response.status == SIPStatus(400): - # Bad Request - # TODO: implement - # TODO: check if broken connection can be brought back - # with new urn:uuid or reply with expire 0 - self._handle_bad_request() else: - self.recvLock.release() raise TimeoutError("Deregistering on SIP Server timed out") - if response.status == SIPStatus(500): - self.recvLock.release() - time.sleep(5) - return self.deregister() + response = SIPMessage(resp) + response = self.trying_timeout_check(response) - if response.status == SIPStatus.OK: - self.recvLock.release() - return True - self.recvLock.release() - return False + if response.status == SIPStatus(401): + # Unauthorized, likely due to being password protected. + regRequest = self.genRegister(response, deregister=True) + self.out.sendto( + regRequest.encode("utf8"), (self.server, self.port) + ) + ready = select.select([self.s], [], [], self.register_timeout) + if ready[0]: + resp = self.s.recv(8192) + response = SIPMessage(resp) + if response.status == SIPStatus(401): + # At this point, it's reasonable to assume that + # this is caused by invalid credentials. + debug("Unauthorized") + raise InvalidAccountInfoError( + "Invalid Username or " + + "Password for SIP server " + + f"{self.server}:" + + f"{self.myPort}" + ) + elif response.status == SIPStatus(400): + # Bad Request + # TODO: implement + # TODO: check if broken connection can be brought back + # with new urn:uuid or reply with expire 0 + self._handle_bad_request() + else: + raise TimeoutError("Deregistering on SIP Server timed out") + + if response.status == SIPStatus(500): + # We raise so the calling function can sleep and try again + raise RetryRequiredError("Response SIP status of 500") + + if response.status == SIPStatus.OK: + return True + return False def register(self) -> bool: try: From 5a4d96b2c590bfa20cad548fde33b71c48020002 Mon Sep 17 00:00:00 2001 From: Justin Bull Date: Mon, 4 Dec 2023 21:41:06 -0500 Subject: [PATCH 4/7] [CHANGE] Move lock aquisition to wrap around __(de)register() calls This is just cleaner code without changing much. __(de)register() no longer safely locks on its, but only their public equivalents call them anyway. We can hoist the context manager wrapping there and prevent two functions from being entirely indented by a `with ...:` statement. --- pyVoIP/SIP.py | 266 +++++++++++++++++++++++++------------------------- 1 file changed, 131 insertions(+), 135 deletions(-) diff --git a/pyVoIP/SIP.py b/pyVoIP/SIP.py index 9178d70..5aefe59 100644 --- a/pyVoIP/SIP.py +++ b/pyVoIP/SIP.py @@ -1652,7 +1652,8 @@ def bye(self, request: SIPMessage) -> None: def deregister(self) -> bool: try: - deregistered = self.__deregister() + with self.recvLock: + deregistered = self.__deregister() if not deregistered: debug("DEREGISTERATION FAILED") return False @@ -1672,64 +1673,62 @@ def deregister(self) -> bool: return False def __deregister(self) -> bool: - with self.recvLock: - self.phone._status = PhoneStatus.DEREGISTERING - firstRequest = self.genFirstRequest(deregister=True) - self.out.sendto( - firstRequest.encode("utf8"), (self.server, self.port) - ) + self.phone._status = PhoneStatus.DEREGISTERING + firstRequest = self.genFirstRequest(deregister=True) + self.out.sendto(firstRequest.encode("utf8"), (self.server, self.port)) + + self.out.setblocking(False) + + ready = select.select([self.out], [], [], self.register_timeout) + if ready[0]: + resp = self.s.recv(8192) + else: + raise TimeoutError("Deregistering on SIP Server timed out") - self.out.setblocking(False) + response = SIPMessage(resp) + response = self.trying_timeout_check(response) - ready = select.select([self.out], [], [], self.register_timeout) + if response.status == SIPStatus(401): + # Unauthorized, likely due to being password protected. + regRequest = self.genRegister(response, deregister=True) + self.out.sendto( + regRequest.encode("utf8"), (self.server, self.port) + ) + ready = select.select([self.s], [], [], self.register_timeout) if ready[0]: resp = self.s.recv(8192) + response = SIPMessage(resp) + if response.status == SIPStatus(401): + # At this point, it's reasonable to assume that + # this is caused by invalid credentials. + debug("Unauthorized") + raise InvalidAccountInfoError( + "Invalid Username or " + + "Password for SIP server " + + f"{self.server}:" + + f"{self.myPort}" + ) + elif response.status == SIPStatus(400): + # Bad Request + # TODO: implement + # TODO: check if broken connection can be brought back + # with new urn:uuid or reply with expire 0 + self._handle_bad_request() else: raise TimeoutError("Deregistering on SIP Server timed out") - response = SIPMessage(resp) - response = self.trying_timeout_check(response) - - if response.status == SIPStatus(401): - # Unauthorized, likely due to being password protected. - regRequest = self.genRegister(response, deregister=True) - self.out.sendto( - regRequest.encode("utf8"), (self.server, self.port) - ) - ready = select.select([self.s], [], [], self.register_timeout) - if ready[0]: - resp = self.s.recv(8192) - response = SIPMessage(resp) - if response.status == SIPStatus(401): - # At this point, it's reasonable to assume that - # this is caused by invalid credentials. - debug("Unauthorized") - raise InvalidAccountInfoError( - "Invalid Username or " - + "Password for SIP server " - + f"{self.server}:" - + f"{self.myPort}" - ) - elif response.status == SIPStatus(400): - # Bad Request - # TODO: implement - # TODO: check if broken connection can be brought back - # with new urn:uuid or reply with expire 0 - self._handle_bad_request() - else: - raise TimeoutError("Deregistering on SIP Server timed out") + if response.status == SIPStatus(500): + # We raise so the calling function can sleep and try again + raise RetryRequiredError("Response SIP status of 500") - if response.status == SIPStatus(500): - # We raise so the calling function can sleep and try again - raise RetryRequiredError("Response SIP status of 500") - - if response.status == SIPStatus.OK: - return True - return False + if response.status == SIPStatus.OK: + return True + return False def register(self) -> bool: try: - registered = self.__register() + with self.recvLock: + registered = self.__register() if not registered: debug("REGISTERATION FAILED") self.registerFailures += 1 @@ -1770,102 +1769,99 @@ def __start_register_timer(self, delay: Optional[int] = None): self.registerThread.start() def __register(self) -> bool: - with self.recvLock: - self.phone._status = PhoneStatus.REGISTERING - firstRequest = self.genFirstRequest() - self.out.sendto( - firstRequest.encode("utf8"), (self.server, self.port) - ) + self.phone._status = PhoneStatus.REGISTERING + firstRequest = self.genFirstRequest() + self.out.sendto(firstRequest.encode("utf8"), (self.server, self.port)) - self.out.setblocking(False) + self.out.setblocking(False) - ready = select.select([self.out], [], [], self.register_timeout) + ready = select.select([self.out], [], [], self.register_timeout) + if ready[0]: + resp = self.s.recv(8192) + else: + raise TimeoutError("Registering on SIP Server timed out") + + response = SIPMessage(resp) + response = self.trying_timeout_check(response) + first_response = response + + if response.status == SIPStatus(400): + # Bad Request + # TODO: implement + # TODO: check if broken connection can be brought back + # with new urn:uuid or reply with expire 0 + self._handle_bad_request() + + if response.status == SIPStatus(401): + # Unauthorized, likely due to being password protected. + regRequest = self.genRegister(response) + self.out.sendto( + regRequest.encode("utf8"), (self.server, self.port) + ) + ready = select.select([self.s], [], [], self.register_timeout) if ready[0]: resp = self.s.recv(8192) + response = SIPMessage(resp) + response = self.trying_timeout_check(response) + if response.status == SIPStatus(401): + # At this point, it's reasonable to assume that + # this is caused by invalid credentials. + debug("=" * 50) + debug("Unauthorized, SIP Message Log:\n") + debug("SENT") + debug(firstRequest) + debug("\nRECEIVED") + debug(first_response.summary()) + debug("\nSENT (DO NOT SHARE THIS PACKET)") + debug(regRequest) + debug("\nRECEIVED") + debug(response.summary()) + debug("=" * 50) + raise InvalidAccountInfoError( + "Invalid Username or " + + "Password for SIP server " + + f"{self.server}:" + + f"{self.myPort}" + ) + elif response.status == SIPStatus(400): + # Bad Request + # TODO: implement + # TODO: check if broken connection can be brought back + # with new urn:uuid or reply with expire 0 + self._handle_bad_request() else: raise TimeoutError("Registering on SIP Server timed out") - response = SIPMessage(resp) - response = self.trying_timeout_check(response) - first_response = response - - if response.status == SIPStatus(400): - # Bad Request - # TODO: implement - # TODO: check if broken connection can be brought back - # with new urn:uuid or reply with expire 0 - self._handle_bad_request() - - if response.status == SIPStatus(401): - # Unauthorized, likely due to being password protected. - regRequest = self.genRegister(response) - self.out.sendto( - regRequest.encode("utf8"), (self.server, self.port) - ) - ready = select.select([self.s], [], [], self.register_timeout) - if ready[0]: - resp = self.s.recv(8192) - response = SIPMessage(resp) - response = self.trying_timeout_check(response) - if response.status == SIPStatus(401): - # At this point, it's reasonable to assume that - # this is caused by invalid credentials. - debug("=" * 50) - debug("Unauthorized, SIP Message Log:\n") - debug("SENT") - debug(firstRequest) - debug("\nRECEIVED") - debug(first_response.summary()) - debug("\nSENT (DO NOT SHARE THIS PACKET)") - debug(regRequest) - debug("\nRECEIVED") - debug(response.summary()) - debug("=" * 50) - raise InvalidAccountInfoError( - "Invalid Username or " - + "Password for SIP server " - + f"{self.server}:" - + f"{self.myPort}" - ) - elif response.status == SIPStatus(400): - # Bad Request - # TODO: implement - # TODO: check if broken connection can be brought back - # with new urn:uuid or reply with expire 0 - self._handle_bad_request() - else: - raise TimeoutError("Registering on SIP Server timed out") - - if response.status == SIPStatus(407): - # Proxy Authentication Required - # TODO: implement - debug("Proxy auth required") - - # TODO: This must be done more reliable - if response.status not in [ - SIPStatus(400), - SIPStatus(401), - SIPStatus(407), - ]: - # Unauthorized - if response.status == SIPStatus(500): - # We raise so the calling function can sleep and try again - raise RetryRequiredError("Response SIP status of 500") - else: - # TODO: determine if needed here - self.parseMessage(response) + if response.status == SIPStatus(407): + # Proxy Authentication Required + # TODO: implement + debug("Proxy auth required") + + # TODO: This must be done more reliable + if response.status not in [ + SIPStatus(400), + SIPStatus(401), + SIPStatus(407), + ]: + # Unauthorized + if response.status == SIPStatus(500): + # We raise so the calling function can sleep and try again + raise RetryRequiredError("Response SIP status of 500") + else: + # TODO: determine if needed here + self.parseMessage(response) - debug(response.summary()) - debug(response.raw) + debug(response.summary()) + debug(response.raw) - if response.status == SIPStatus.OK: - return True - else: - raise InvalidAccountInfoError( - "Invalid Username or Password for " - + f"SIP server {self.server}:" - + f"{self.myPort}" - ) + if response.status == SIPStatus.OK: + return True + else: + raise InvalidAccountInfoError( + "Invalid Username or Password for " + + f"SIP server {self.server}:" + + f"{self.myPort}" + ) def _handle_bad_request(self) -> None: # Bad Request From a2abe0f2592649ccc16527b590f4461ab11817d5 Mon Sep 17 00:00:00 2001 From: Justin Bull Date: Mon, 4 Dec 2023 21:59:15 -0500 Subject: [PATCH 5/7] [CHANGE] Refactor recv() to use Lock context manager and simplify setblocking() Like the other preceding commits, this should eliminate the risk of missing release() and corresponding setblocking(True) calls for this function. --- pyVoIP/SIP.py | 60 +++++++++++++++++++++++++-------------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/pyVoIP/SIP.py b/pyVoIP/SIP.py index 5aefe59..250635e 100644 --- a/pyVoIP/SIP.py +++ b/pyVoIP/SIP.py @@ -846,42 +846,42 @@ def __init__( self.registerFailures = 0 self.recvLock = Lock() - def recv(self) -> None: + def recv_loop(self) -> None: while self.NSD: - # TODO: Can this safely be changed to use context manager syntax - # or does the BlockingIOError handler ruin that? - self.recvLock.acquire() - self.s.setblocking(False) try: - raw = self.s.recv(8192) - if raw != b"\x00\x00\x00\x00": - try: - message = SIPMessage(raw) - debug(message.summary()) - self.parseMessage(message) - except Exception as ex: - debug(f"Error on header parsing: {ex}") + with self.recvLock: + self.s.setblocking(False) + self.recv() except BlockingIOError: self.s.setblocking(True) - self.recvLock.release() time.sleep(0.01) continue - except SIPParseError as e: - if "SIP Version" in str(e): - request = self.genSIPVersionNotSupported(message) - self.out.sendto( - request.encode("utf8"), (self.server, self.port) - ) - else: - debug(f"SIPParseError in SIP.recv: {type(e)}, {e}") - except Exception as e: - debug(f"SIP.recv error: {type(e)}, {e}\n\n{str(raw, 'utf8')}") - if pyVoIP.DEBUG: - self.s.setblocking(True) - self.recvLock.release() - raise self.s.setblocking(True) - self.recvLock.release() + + def recv(self) -> None: + try: + raw = self.s.recv(8192) + if raw != b"\x00\x00\x00\x00": + try: + message = SIPMessage(raw) + debug(message.summary()) + self.parseMessage(message) + except Exception as ex: + debug(f"Error on header parsing: {ex}") + except SIPParseError as e: + if "SIP Version" in str(e): + request = self.genSIPVersionNotSupported(message) + self.out.sendto( + request.encode("utf8"), (self.server, self.port) + ) + else: + debug(f"SIPParseError in SIP.recv: {type(e)}, {e}") + except Exception as e: + debug(f"SIP.recv error: {type(e)}, {e}\n\n{str(raw, 'utf8')}") + # Re-raise BlockingIOError so recv_loop() can release locks and + # continue + if isinstance(e, BlockingIOError) or pyVoIP.DEBUG: + raise def parseMessage(self, message: SIPMessage) -> None: warnings.warn( @@ -961,7 +961,7 @@ def start(self) -> None: self.s.bind((self.myIP, self.myPort)) self.out = self.s self.register() - t = Timer(1, self.recv) + t = Timer(1, self.recv_loop) t.name = "SIP Recieve" t.start() From c18bd47408fa0dec6b6777fe8fc7dbf01871231f Mon Sep 17 00:00:00 2001 From: Justin Bull Date: Mon, 4 Dec 2023 22:23:39 -0500 Subject: [PATCH 6/7] [ADD] acquired_lock_and_unblocked_socket() to combine locking and blocking Since recv_loop pairs acquiring a lock and setting a socket as non-blocking together, as well as their corresponding release/blocking, combining them into a single context manager function makes it safer. --- pyVoIP/SIP.py | 6 ++---- pyVoIP/util.py | 20 ++++++++++++++++++++ tests/test_util.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 4 deletions(-) create mode 100644 pyVoIP/util.py create mode 100644 tests/test_util.py diff --git a/pyVoIP/SIP.py b/pyVoIP/SIP.py index 250635e..876a1ea 100644 --- a/pyVoIP/SIP.py +++ b/pyVoIP/SIP.py @@ -1,6 +1,7 @@ from enum import Enum, IntEnum from threading import Timer, Lock from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING +from pyVoIP.util import acquired_lock_and_unblocked_socket from pyVoIP.VoIP.status import PhoneStatus import pyVoIP import hashlib @@ -849,14 +850,11 @@ def __init__( def recv_loop(self) -> None: while self.NSD: try: - with self.recvLock: - self.s.setblocking(False) + with acquired_lock_and_unblocked_socket(self.recvLock, self.s): self.recv() except BlockingIOError: - self.s.setblocking(True) time.sleep(0.01) continue - self.s.setblocking(True) def recv(self) -> None: try: diff --git a/pyVoIP/util.py b/pyVoIP/util.py new file mode 100644 index 0000000..fa7c6cb --- /dev/null +++ b/pyVoIP/util.py @@ -0,0 +1,20 @@ +from contextlib import contextmanager +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from socket import socket + from threading import Lock + + +@contextmanager +def acquired_lock_and_unblocked_socket(lock: "Lock", socket: "socket"): + """Alongside an acquired Lock, a corresponding socket will become + non-blocking, and then blocking once the Lock is released. + + Lock will release and socket will become blocking even during exceptions""" + try: + with lock: + socket.setblocking(False) + yield + finally: + socket.setblocking(True) diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 0000000..74c016c --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,31 @@ +from pyVoIP.util import acquired_lock_and_unblocked_socket +from threading import Lock +from socket import socket +import pytest + + +def test_acquired_lock_and_unblocked_socket(): + l = Lock() + s = socket() + assert l.locked() is False + assert s.getblocking() is True + with acquired_lock_and_unblocked_socket(l, s): + assert l.locked() is True + assert s.getblocking() is False + assert l.locked() is False + assert s.getblocking() is True + + +def test_acquired_lock_and_unblocked_socket__with_exception(): + l = Lock() + s = socket() + assert l.locked() is False + assert s.getblocking() is True + with pytest.raises(Exception): + with acquired_lock_and_unblocked_socket(l, s): + assert l.locked() is True + assert s.getblocking() is False + raise Exception("Uh oh") + assert False, "Should never execute" + assert l.locked() is False + assert s.getblocking() is True From 661b49d0e2390591e3d121a32f17d8284f4c16a1 Mon Sep 17 00:00:00 2001 From: Justin Bull Date: Mon, 4 Dec 2023 22:41:13 -0500 Subject: [PATCH 7/7] [FIX] UnboundLocalError from `raw` not existing on BlockingIOError case --- pyVoIP/SIP.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pyVoIP/SIP.py b/pyVoIP/SIP.py index 876a1ea..7dc7693 100644 --- a/pyVoIP/SIP.py +++ b/pyVoIP/SIP.py @@ -874,11 +874,13 @@ def recv(self) -> None: ) else: debug(f"SIPParseError in SIP.recv: {type(e)}, {e}") - except Exception as e: - debug(f"SIP.recv error: {type(e)}, {e}\n\n{str(raw, 'utf8')}") + except BlockingIOError: # Re-raise BlockingIOError so recv_loop() can release locks and # continue - if isinstance(e, BlockingIOError) or pyVoIP.DEBUG: + raise + except Exception as e: + debug(f"SIP.recv error: {type(e)}, {e}\n\n{str(raw, 'utf8')}") + if pyVoIP.DEBUG: raise def parseMessage(self, message: SIPMessage) -> None: