From 4febbbe9f4d9279906c9eee57fc8384290834c73 Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Sun, 5 Jan 2025 21:59:43 -0600 Subject: [PATCH 01/10] VOIP Assist Satellite Announce support Updates to make assist satellite announcements work with the VOIP integration. Allow using the existing SIP transport for Home Assistant for both incoming and outgoing calls. --- voip_utils/sip.py | 381 +++++++++++++++++++++++++++++++++++---------- voip_utils/voip.py | 47 +++--- 2 files changed, 328 insertions(+), 100 deletions(-) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index 3186d62..eefb0ca 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -79,6 +79,8 @@ class CallInfo: server_ip: str headers: dict[str, str] opus_payload_type: int = OPUS_PAYLOAD_TYPE + local_rtp_ip: str = None + local_rtp_port: int = None @property def caller_rtcp_port(self) -> int: @@ -95,6 +97,11 @@ def caller_sip_port(self) -> int: """SIP port of caller.""" return self.caller_endpoint.port + @property + def local_rtcp_port(self) -> int | None: + """Get the local RTCP port.""" + return self.local_rtp_port + 1 if self.local_rtp_port is not None else None + @dataclass class RtpInfo: @@ -168,6 +175,78 @@ def __init__(self, sdp_info: SdpInfo) -> None: """Set up SIP server.""" self.sdp_info = sdp_info self.transport = None + self._outgoing_calls: dict[str,int] = {} + + def outgoing_call(self, source: SipEndpoint, destination: SipEndpoint, rtp_port: int): + """Make an outgoing call from the given source endpoint to the destination endpoint, using the rtp_port for the local RTP port of the call.""" + session_id = str(time.monotonic_ns()) + session_version = session_id + call_id = session_id + self._register_outgoing_call(call_id, rtp_port) + if self.transport is None: + _LOGGER.warn("No transport for outgoing VOIP calls") + return + + sdp_lines = [ + "v=0", + f"o={source.username} {session_id} {session_version} IN IP4 {source.host}", + "s=Talk", + f"c=IN IP4 {source.host}", + "t=0 0", + f"m=audio {rtp_port} RTP/AVP 123 96 101 103 104", + "a=sendrecv", + "a=rtpmap:96 opus/48000/2", + "a=fmtp:96 useinbandfec=0", + "a=rtpmap:123 opus/48000/2", + "a=fmtp:123 maxplaybackrate=16000", + "a=rtpmap:101 telephone-event/48000", + "a=rtpmap:103 telephone-event/16000", + "a=rtpmap:104 telephone-event/8000", + "a=ptime:20", + "", + ] + sdp_text = _CRLF.join(sdp_lines) + sdp_bytes = sdp_text.encode("utf-8") + + invite_lines = [ + f"INVITE {destination.uri} SIP/2.0", + f"Via: SIP/2.0/UDP {source.host}:{source.port}", + f"From: {source.sip_header}", + f"Contact: {source.sip_header}", + f"To: {destination.sip_header}", + f"Call-ID: {call_id}", + "CSeq: 50 INVITE", + "User-Agent: test-agent 1.0", + "Allow: INVITE, ACK, OPTIONS, CANCEL, BYE, SUBSCRIBE, NOTIFY, INFO, REFER, UPDATE", + "Accept: application/sdp, application/dtmf-relay", + "Content-Type: application/sdp", + f"Content-Length: {len(sdp_bytes)}", + "", + ] + invite_text = _CRLF.join(invite_lines) + _CRLF + invite_bytes = invite_text.encode("utf-8") + + msg_bytes = invite_bytes + sdp_bytes + + _LOGGER.debug(msg_bytes) + + self.transport.sendto( + msg_bytes, + (destination.host, destination.port), + ) + + + def _register_outgoing_call(self, call_id: str, rtp_port: int): + """Register the RTP port associated with an outgoing call.""" + self._outgoing_calls[call_id] = rtp_port + + def _get_call_rtp_port(self, call_id: str): + """Get the RTP port associated with an outgoing call.""" + return self._outgoing_calls.get(call_id) + + def _end_outgoing_call(self, call_id: str): + """Register the end of an outgoing call.""" + self._outgoing_calls.pop(call_id, None) def connection_made(self, transport): """Server ready.""" @@ -190,87 +269,199 @@ def datagram_received(self, data: bytes, addr): if method: method = method.lower() - if method != "invite": - # Not an INVITE message - return - - if not ruri: - raise ValueError("Empty receiver URI") - - caller_endpoint = None - # The From header should give us the URI used for sending SIP messages to the device - if headers.get("from") is not None: - caller_endpoint = SipEndpoint(headers.get("from", "")) - # We can try using the Contact header as a fallback - elif headers.get("contact") is not None: - caller_endpoint = SipEndpoint(headers.get("contact", "")) - # If all else fails try to generate a URI based on the IP and port from the address the message came from - else: - caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) - - _LOGGER.debug("Incoming call from endpoint=%s", caller_endpoint) - - # Extract caller's RTP port from SDP. - # See: https://datatracker.ietf.org/doc/html/rfc2327 - caller_rtp_port: Optional[int] = None - opus_payload_type = OPUS_PAYLOAD_TYPE - body_lines = body.splitlines() - for line in body_lines: - line = line.strip() - if line: - key, value = line.split("=", maxsplit=1) - if key == "m": - parts = value.split() - if parts[0] == "audio": - caller_rtp_port = int(parts[1]) - elif key == "a" and value.startswith("rtpmap:"): - # a=rtpmap:123 opus/48000/2 - codec_str = value.split(":", maxsplit=1)[1] - codec_parts = codec_str.split() - if (len(codec_parts) > 1) and ( - codec_parts[1].lower().startswith("opus") - ): - opus_payload_type = int(codec_parts[0]) - _LOGGER.debug( - "Detected OPUS payload type as %s", opus_payload_type - ) - - if caller_rtp_port is None: - raise VoipError("No caller RTP port") - - # Extract host from ruri - # sip:user@123.123.123.123:1234 - re_splituri = re.compile( - r"(?P\w+):" # Scheme - + r"(?:(?P[\w\.]+):?(?P[\w\.]+)?@)?" # User:Password - + r"\[?(?P" # Begin group host - + r"(?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|" # IPv4 address Host Or - + r"(?:(?:[0-9a-fA-F]{1,4}):){7}[0-9a-fA-F]{1,4}|" # IPv6 address Host Or - + r"(?:(?:[0-9A-Za-z]+\.)+[0-9A-Za-z]+)" # Hostname string - + r")\]?:?" # End group host - + r"(?P\d{1,6})?" # port - + r"(?:\;(?P[^\?]*))?" # parameters - + r"(?:\?(?P.*))?" # headers - ) - re_uri = re_splituri.search(ruri) - if re_uri is None: - raise ValueError("Receiver URI did not match expected pattern") - - server_ip = re_uri.group("host") - if not is_ipv4_address(server_ip): - raise VoipError(f"Invalid IPv4 address in {ruri}") - - self.on_call( - CallInfo( - caller_endpoint=caller_endpoint, - caller_rtp_port=caller_rtp_port, - server_ip=server_ip, - headers=headers, - opus_payload_type=opus_payload_type, + if method == "invite": + # An invite message means someone called HA + _LOGGER.debug("Received invite message") + if not ruri: + raise ValueError("Empty receiver URI") + + caller_endpoint = None + # The From header should give us the URI used for sending SIP messages to the device + if headers.get("from") is not None: + caller_endpoint = SipEndpoint(headers.get("from", "")) + # We can try using the Contact header as a fallback + elif headers.get("contact") is not None: + caller_endpoint = SipEndpoint(headers.get("contact", "")) + # If all else fails try to generate a URI based on the IP and port from the address the message came from + else: + caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) + + _LOGGER.debug("Incoming call from endpoint=%s", caller_endpoint) + + # Extract caller's RTP port from SDP. + # See: https://datatracker.ietf.org/doc/html/rfc2327 + caller_rtp_port: Optional[int] = None + opus_payload_type = OPUS_PAYLOAD_TYPE + body_lines = body.splitlines() + for line in body_lines: + line = line.strip() + if line: + key, value = line.split("=", maxsplit=1) + if key == "m": + parts = value.split() + if parts[0] == "audio": + caller_rtp_port = int(parts[1]) + elif key == "a" and value.startswith("rtpmap:"): + # a=rtpmap:123 opus/48000/2 + codec_str = value.split(":", maxsplit=1)[1] + codec_parts = codec_str.split() + if (len(codec_parts) > 1) and ( + codec_parts[1].lower().startswith("opus") + ): + opus_payload_type = int(codec_parts[0]) + _LOGGER.debug( + "Detected OPUS payload type as %s", opus_payload_type + ) + + if caller_rtp_port is None: + raise VoipError("No caller RTP port") + + # Extract host from ruri + # sip:user@123.123.123.123:1234 + re_splituri = re.compile( + r"(?P\w+):" # Scheme + + r"(?:(?P[\w\.]+):?(?P[\w\.]+)?@)?" # User:Password + + r"\[?(?P" # Begin group host + + r"(?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|" # IPv4 address Host Or + + r"(?:(?:[0-9a-fA-F]{1,4}):){7}[0-9a-fA-F]{1,4}|" # IPv6 address Host Or + + r"(?:(?:[0-9A-Za-z]+\.)+[0-9A-Za-z]+)" # Hostname string + + r")\]?:?" # End group host + + r"(?P\d{1,6})?" # port + + r"(?:\;(?P[^\?]*))?" # parameters + + r"(?:\?(?P.*))?" # headers ) - ) + re_uri = re_splituri.search(ruri) + if re_uri is None: + raise ValueError("Receiver URI did not match expected pattern") + + server_ip = re_uri.group("host") + if not is_ipv4_address(server_ip): + raise VoipError(f"Invalid IPv4 address in {ruri}") + + self.on_call( + CallInfo( + caller_endpoint=caller_endpoint, + caller_rtp_port=caller_rtp_port, + server_ip=server_ip, + headers=headers, + opus_payload_type=opus_payload_type, + ) + ) + elif method == "sip/2.0": + # Reply message means we must have received a response to someone we called + # TODO: Verify that the call / sequence IDs match our outgoing INVITE + _LOGGER.debug("Received INVITE response [%s]", message) + is_ok = False + protocol, code, reason, headers, body = self._parse_sip_reply(message) + if (code == "200") and (reason == "OK"): + is_ok = True + if not is_ok: + _LOGGER.debug("Received non-OK response [%s]", message) + return + + _LOGGER.debug("Got OK message") + if self.transport is None: + _LOGGER.debug("No transport for exchanging SIP message") + return + rtp_info = get_rtp_info(body) + remote_rtp_ip = rtp_info.rtp_ip + remote_rtp_port = rtp_info.rtp_port + opus_payload_type = rtp_info.payload_type + to_header = headers["to"] + caller_endpoint = None + # The From header should give us the URI used for sending SIP messages to the device + if headers.get("to") is not None: + caller_endpoint = SipEndpoint(headers.get("to", "")) + else: + caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) + + _LOGGER.debug("Outgoing call to endpoint=%s", caller_endpoint) + call_id = headers["call-id"] + ack_lines = [ + f"ACK {caller_endpoint.uri} SIP/2.0", + f"Via: SIP/2.0/UDP {self.local_endpoint.host}:{self.local_endpoint.port}", + f"From: {self.local_endpoint.sip_header}", + f"To: {to_header}", + f"Call-ID: {call_id}", + "CSeq: 50 ACK", + "User-Agent: test-agent 1.0", + "Content-Length: 0", + ] + ack_text = _CRLF.join(ack_lines) + _CRLF + ack_bytes = ack_text.encode("utf-8") + self.transport.sendto( + ack_bytes, (caller_ip, caller_sip_port) + ) + + # The call been answered, proceed with desired action here + local_rtp_port = self._get_call_rtp_port(call_id) + self.on_call( + CallInfo( + caller_endpoint=caller_endpoint, + caller_rtp_port=remote_rtp_port, + server_ip=remote_rtp_ip, + headers=headers, + opus_payload_type=opus_payload_type, # Should probably update this to eventually support more codecs + local_rtp_ip=self.local_endpoint.host, + local_rtp_port=local_rtp_port + ) + ) + elif method == "bye": + # Acknowlege the BYE message when the remote party hangs up + _LOGGER.debug("Received BYE message: %s", message) + if self.transport is None: + _LOGGER.debug("Skipping message: %s", message) + return + + # Acknowledge the BYE message, otherwise the phone will keep sending it + ( + protocol, + code, + reason, + headers, + body, + ) = self._parse_sip_reply(message) + _LOGGER.debug( + "Parsed response protocol=%s code=%s reason=%s headers=[%s] body=[%s]", + protocol, + code, + reason, + headers, + body, + ) + rtp_info = get_rtp_info(body) + remote_rtp_port = rtp_info.rtp_port + opus_payload_type = rtp_info.payload_type + via_header = headers["via"] + from_header = headers["from"] + to_header = headers["to"] + callid_header = headers["call-id"] + cseq_header = headers["cseq"] + # We should remove the call from the outgoing calls dict now if it is there + self._end_outgoing_call(callid_header) + ok_lines = [ + "SIP/2.0 200 OK", + f"Via: {via_header}", + f"From: {from_header}", + f"To: {to_header}", + f"Call-ID: {callid_header}", + f"CSeq: {cseq_header}", + "User-Agent: test-agent 1.0", + "Content-Length: 0", + ] + ok_text = _CRLF.join(ok_lines) + _CRLF + ok_bytes = ok_text.encode("utf-8") + # We should probably tell the associated RTP server to shutdown at this point, assuming we aren't reusing it for other calls + _LOGGER.debug("Sending OK for BYE message: %s", ok_text) + self.transport.sendto( + ok_bytes, + (caller_ip, caller_sip_port), + ) + # The transport might be used for incoming calls + # as well, so we should leave it open. + except Exception: - _LOGGER.exception("Unexpected error handling SIP INVITE") + _LOGGER.exception("Unexpected error handling SIP message") @abstractmethod def on_call(self, call_info: CallInfo): @@ -363,6 +554,38 @@ def _parse_sip( body = message[offset:] return method, ruri, headers, body + + def _parse_sip_reply( + self, message: str + ) -> Tuple[Optional[str], Optional[str], Optional[str], Dict[str, str], str]: + """Parse SIP message and return method, headers, and body.""" + lines = message.splitlines() + + protocol: Optional[str] = None + code: Optional[str] = None + reason: Optional[str] = None + headers: dict[str, str] = {} + offset: int = 0 + + # See: https://datatracker.ietf.org/doc/html/rfc3261 + for i, line in enumerate(lines): + if line: + offset += len(line) + len(_CRLF) + + if i == 0: + line_parts = line.split() + protocol = line_parts[0] + code = line_parts[1] + reason = line_parts[2] + elif not line: + break + else: + key, value = line.split(":", maxsplit=1) + headers[key.lower()] = value.strip() + + body = message[offset:] + + return protocol, code, reason, headers, body class CallPhoneDatagramProtocol(asyncio.DatagramProtocol, ABC): diff --git a/voip_utils/voip.py b/voip_utils/voip.py index 64ab7a2..1940e50 100644 --- a/voip_utils/voip.py +++ b/voip_utils/voip.py @@ -57,34 +57,39 @@ def on_call(self, call_info: CallInfo): _LOGGER.debug("Call rejected: %s", call_info) return - # Find free RTP/RTCP ports rtp_ip = "" - rtp_port = 0 + if (call_info.local_rtp_port is None): + # Find free RTP/RTCP ports + rtp_port = 0 - while True: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setblocking(False) + while True: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setblocking(False) - # Bind to a random UDP port - sock.bind(("", 0)) - rtp_ip, rtp_port = sock.getsockname() + # Bind to a random UDP port + sock.bind(("", 0)) + rtp_ip, rtp_port = sock.getsockname() - # Close socket to free port for re-use - sock.close() + # Close socket to free port for re-use + sock.close() - # Check that the next port up is available for RTCP - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - sock.bind(("", rtp_port + 1)) + # Check that the next port up is available for RTCP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.bind(("", rtp_port + 1)) - # Will be opened again below - sock.close() + # Will be opened again below + sock.close() - # Found our ports - break - except OSError: - # RTCP port is taken - pass + # Found our ports + break + except OSError: + # RTCP port is taken + pass + + else: + rtp_ip = call_info.local_rtp_ip + rtp_port = call_info.local_rtp_port _LOGGER.debug( "Starting RTP server on ip=%s, rtp_port=%s, rtcp_port=%s", From ce3269de927767ad1cb4f940448970c8556d127c Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Thu, 9 Jan 2025 22:00:16 -0600 Subject: [PATCH 02/10] Fix linting issues --- voip_utils/sip.py | 37 +++++++++++++++++++++---------------- voip_utils/voip.py | 4 ++-- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index eefb0ca..a15e0e9 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -79,8 +79,8 @@ class CallInfo: server_ip: str headers: dict[str, str] opus_payload_type: int = OPUS_PAYLOAD_TYPE - local_rtp_ip: str = None - local_rtp_port: int = None + local_rtp_ip: str | None = None + local_rtp_port: int | None = None @property def caller_rtcp_port(self) -> int: @@ -175,16 +175,18 @@ def __init__(self, sdp_info: SdpInfo) -> None: """Set up SIP server.""" self.sdp_info = sdp_info self.transport = None - self._outgoing_calls: dict[str,int] = {} + self._outgoing_calls: dict[str, int] = {} - def outgoing_call(self, source: SipEndpoint, destination: SipEndpoint, rtp_port: int): + def outgoing_call( + self, source: SipEndpoint, destination: SipEndpoint, rtp_port: int + ): """Make an outgoing call from the given source endpoint to the destination endpoint, using the rtp_port for the local RTP port of the call.""" session_id = str(time.monotonic_ns()) session_version = session_id call_id = session_id self._register_outgoing_call(call_id, rtp_port) if self.transport is None: - _LOGGER.warn("No transport for outgoing VOIP calls") + _LOGGER.warning("No transport for outgoing VOIP calls") return sdp_lines = [ @@ -235,7 +237,6 @@ def outgoing_call(self, source: SipEndpoint, destination: SipEndpoint, rtp_port: (destination.host, destination.port), ) - def _register_outgoing_call(self, call_id: str, rtp_port: int): """Register the RTP port associated with an outgoing call.""" self._outgoing_calls[call_id] = rtp_port @@ -310,7 +311,8 @@ def datagram_received(self, data: bytes, addr): ): opus_payload_type = int(codec_parts[0]) _LOGGER.debug( - "Detected OPUS payload type as %s", opus_payload_type + "Detected OPUS payload type as %s", + opus_payload_type, ) if caller_rtp_port is None: @@ -369,18 +371,23 @@ def datagram_received(self, data: bytes, addr): opus_payload_type = rtp_info.payload_type to_header = headers["to"] caller_endpoint = None - # The From header should give us the URI used for sending SIP messages to the device if headers.get("to") is not None: caller_endpoint = SipEndpoint(headers.get("to", "")) else: caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) + # The From header should give us the URI used for sending SIP messages to the device + local_endpoint = None + if headers.get("from") is not None: + local_endpoint = SipEndpoint(headers.get("from", "")) + else: + local_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) _LOGGER.debug("Outgoing call to endpoint=%s", caller_endpoint) call_id = headers["call-id"] ack_lines = [ f"ACK {caller_endpoint.uri} SIP/2.0", - f"Via: SIP/2.0/UDP {self.local_endpoint.host}:{self.local_endpoint.port}", - f"From: {self.local_endpoint.sip_header}", + f"Via: SIP/2.0/UDP {local_endpoint.host}:{local_endpoint.port}", + f"From: {local_endpoint.sip_header}", f"To: {to_header}", f"Call-ID: {call_id}", "CSeq: 50 ACK", @@ -389,9 +396,7 @@ def datagram_received(self, data: bytes, addr): ] ack_text = _CRLF.join(ack_lines) + _CRLF ack_bytes = ack_text.encode("utf-8") - self.transport.sendto( - ack_bytes, (caller_ip, caller_sip_port) - ) + self.transport.sendto(ack_bytes, (caller_ip, caller_sip_port)) # The call been answered, proceed with desired action here local_rtp_port = self._get_call_rtp_port(call_id) @@ -402,8 +407,8 @@ def datagram_received(self, data: bytes, addr): server_ip=remote_rtp_ip, headers=headers, opus_payload_type=opus_payload_type, # Should probably update this to eventually support more codecs - local_rtp_ip=self.local_endpoint.host, - local_rtp_port=local_rtp_port + local_rtp_ip=local_endpoint.host, + local_rtp_port=local_rtp_port, ) ) elif method == "bye": @@ -554,7 +559,7 @@ def _parse_sip( body = message[offset:] return method, ruri, headers, body - + def _parse_sip_reply( self, message: str ) -> Tuple[Optional[str], Optional[str], Optional[str], Dict[str, str], str]: diff --git a/voip_utils/voip.py b/voip_utils/voip.py index 1940e50..275408e 100644 --- a/voip_utils/voip.py +++ b/voip_utils/voip.py @@ -58,7 +58,7 @@ def on_call(self, call_info: CallInfo): return rtp_ip = "" - if (call_info.local_rtp_port is None): + if call_info.local_rtp_port is None: # Find free RTP/RTCP ports rtp_port = 0 @@ -88,7 +88,7 @@ def on_call(self, call_info: CallInfo): pass else: - rtp_ip = call_info.local_rtp_ip + rtp_ip = call_info.local_rtp_ip if call_info.local_rtp_ip else "" rtp_port = call_info.local_rtp_port _LOGGER.debug( From 4290d149ac876dfe76bcf9cea45cf6140b91955d Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Sun, 12 Jan 2025 13:31:56 -0600 Subject: [PATCH 03/10] Add SipMessage data class. --- voip_utils/sip.py | 269 +++++++++++++++++++--------------------------- 1 file changed, 109 insertions(+), 160 deletions(-) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index a15e0e9..9810bba 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -19,6 +19,8 @@ _LOGGER = logging.getLogger(__name__) _CRLF = "\r\n" +VOIP_UTILS_AGENT = "voip-utils" + @dataclass class SdpInfo: @@ -70,6 +72,55 @@ def __post_init__(self): raise ValueError("Invalid SIP header") +@dataclass +class SipMessage: + """Data parsed from a SIP message.""" + + protocol: str + method: Optional[str] + request_uri: Optional[str] + code: Optional[str] + reason: Optional[str] + headers: dict[str, str] + body: str + + @staticmethod + def parse_sip(message: str) -> SipMessage: + """Parse a SIP message into a SipMessage object.""" + lines = message.splitlines() + + method: Optional[str] = None + request_uri: Optional[str] = None + code: Optional[str] = None + reason: Optional[str] = None + headers: dict[str, str] = {} + offset: int = 0 + + # See: https://datatracker.ietf.org/doc/html/rfc3261 + for i, line in enumerate(lines): + if line: + offset += len(line) + len(_CRLF) + + if i == 0: + line_parts = line.split() + if line_parts[0].startswith("SIP"): + protocol = line_parts[0] + code = line_parts[1] + reason = line_parts[2] + else: + method = line_parts[0] + request_uri = line_parts[1] + protocol = line_parts[2] + elif not line: + break + else: + key, value = line.split(":", maxsplit=1) + headers[key.lower()] = value.strip() + + body = message[offset:] + return SipMessage(protocol, method, request_uri, code, reason, headers, body) + + @dataclass class CallInfo: """Information gathered from an INVITE message.""" @@ -181,13 +232,14 @@ def outgoing_call( self, source: SipEndpoint, destination: SipEndpoint, rtp_port: int ): """Make an outgoing call from the given source endpoint to the destination endpoint, using the rtp_port for the local RTP port of the call.""" + if self.transport is None: + _LOGGER.warning("No transport for outgoing VOIP calls") + raise RuntimeError("No transport available for outgoing call.") + session_id = str(time.monotonic_ns()) session_version = session_id call_id = session_id self._register_outgoing_call(call_id, rtp_port) - if self.transport is None: - _LOGGER.warning("No transport for outgoing VOIP calls") - return sdp_lines = [ "v=0", @@ -218,7 +270,7 @@ def outgoing_call( f"To: {destination.sip_header}", f"Call-ID: {call_id}", "CSeq: 50 INVITE", - "User-Agent: test-agent 1.0", + f"User-Agent: {VOIP_UTILS_AGENT} 1.0", "Allow: INVITE, ACK, OPTIONS, CANCEL, BYE, SUBSCRIBE, NOTIFY, INFO, REFER, UPDATE", "Accept: application/sdp, application/dtmf-relay", "Content-Type: application/sdp", @@ -241,7 +293,7 @@ def _register_outgoing_call(self, call_id: str, rtp_port: int): """Register the RTP port associated with an outgoing call.""" self._outgoing_calls[call_id] = rtp_port - def _get_call_rtp_port(self, call_id: str): + def _get_call_rtp_port(self, call_id: str) -> int | None: """Get the RTP port associated with an outgoing call.""" return self._outgoing_calls.get(call_id) @@ -258,31 +310,34 @@ def datagram_received(self, data: bytes, addr): try: caller_ip, caller_sip_port = addr message = data.decode("utf-8") - method, ruri, headers, body = self._parse_sip(message) + smsg = SipMessage.parse_sip(message) _LOGGER.debug( - "Received datagram method=%s, ruri=%s, headers=%s, body=%s", - method, - ruri, - headers, - body, + "Received datagram protocol=[%s], method=[%s], ruri=[%s], code=[%s], reason=[%s], headers=[%s], body=[%s]", + smsg.protocol, + smsg.method, + smsg.request_uri, + smsg.code, + smsg.reason, + smsg.headers, + smsg.body, ) - - if method: + method = smsg.method + if method is not None: method = method.lower() if method == "invite": # An invite message means someone called HA _LOGGER.debug("Received invite message") - if not ruri: + if not smsg.request_uri: raise ValueError("Empty receiver URI") caller_endpoint = None # The From header should give us the URI used for sending SIP messages to the device - if headers.get("from") is not None: - caller_endpoint = SipEndpoint(headers.get("from", "")) + if smsg.headers.get("from") is not None: + caller_endpoint = SipEndpoint(smsg.headers.get("from", "")) # We can try using the Contact header as a fallback - elif headers.get("contact") is not None: - caller_endpoint = SipEndpoint(headers.get("contact", "")) + elif smsg.headers.get("contact") is not None: + caller_endpoint = SipEndpoint(smsg.headers.get("contact", "")) # If all else fails try to generate a URI based on the IP and port from the address the message came from else: caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) @@ -293,7 +348,7 @@ def datagram_received(self, data: bytes, addr): # See: https://datatracker.ietf.org/doc/html/rfc2327 caller_rtp_port: Optional[int] = None opus_payload_type = OPUS_PAYLOAD_TYPE - body_lines = body.splitlines() + body_lines = smsg.body.splitlines() for line in body_lines: line = line.strip() if line: @@ -332,66 +387,61 @@ def datagram_received(self, data: bytes, addr): + r"(?:\;(?P[^\?]*))?" # parameters + r"(?:\?(?P.*))?" # headers ) - re_uri = re_splituri.search(ruri) + re_uri = re_splituri.search(smsg.request_uri) if re_uri is None: raise ValueError("Receiver URI did not match expected pattern") server_ip = re_uri.group("host") if not is_ipv4_address(server_ip): - raise VoipError(f"Invalid IPv4 address in {ruri}") + raise VoipError(f"Invalid IPv4 address in {smsg.request_uri}") self.on_call( CallInfo( caller_endpoint=caller_endpoint, caller_rtp_port=caller_rtp_port, server_ip=server_ip, - headers=headers, + headers=smsg.headers, opus_payload_type=opus_payload_type, ) ) - elif method == "sip/2.0": + elif method is None: # Reply message means we must have received a response to someone we called # TODO: Verify that the call / sequence IDs match our outgoing INVITE - _LOGGER.debug("Received INVITE response [%s]", message) - is_ok = False - protocol, code, reason, headers, body = self._parse_sip_reply(message) - if (code == "200") and (reason == "OK"): - is_ok = True + _LOGGER.debug("Received response [%s]", message) + is_ok = smsg.code == "200" and smsg.reason == "OK" if not is_ok: _LOGGER.debug("Received non-OK response [%s]", message) return _LOGGER.debug("Got OK message") if self.transport is None: - _LOGGER.debug("No transport for exchanging SIP message") + _LOGGER.warning("No transport for exchanging SIP message") return - rtp_info = get_rtp_info(body) + rtp_info = get_rtp_info(smsg.body) remote_rtp_ip = rtp_info.rtp_ip remote_rtp_port = rtp_info.rtp_port opus_payload_type = rtp_info.payload_type - to_header = headers["to"] caller_endpoint = None - if headers.get("to") is not None: - caller_endpoint = SipEndpoint(headers.get("to", "")) + if smsg.headers.get("to") is not None: + caller_endpoint = SipEndpoint(smsg.headers.get("to", "")) else: caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) # The From header should give us the URI used for sending SIP messages to the device local_endpoint = None - if headers.get("from") is not None: - local_endpoint = SipEndpoint(headers.get("from", "")) + if smsg.headers.get("from") is not None: + local_endpoint = SipEndpoint(smsg.headers.get("from", "")) else: local_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) _LOGGER.debug("Outgoing call to endpoint=%s", caller_endpoint) - call_id = headers["call-id"] ack_lines = [ f"ACK {caller_endpoint.uri} SIP/2.0", f"Via: SIP/2.0/UDP {local_endpoint.host}:{local_endpoint.port}", f"From: {local_endpoint.sip_header}", - f"To: {to_header}", - f"Call-ID: {call_id}", + f"To: {smsg.headers['to']}", + f"Call-ID: {smsg.headers['call-id']}", "CSeq: 50 ACK", - "User-Agent: test-agent 1.0", + f"User-Agent: {VOIP_UTILS_AGENT} 1.0", "Content-Length: 0", ] ack_text = _CRLF.join(ack_lines) + _CRLF @@ -399,13 +449,13 @@ def datagram_received(self, data: bytes, addr): self.transport.sendto(ack_bytes, (caller_ip, caller_sip_port)) # The call been answered, proceed with desired action here - local_rtp_port = self._get_call_rtp_port(call_id) + local_rtp_port = self._get_call_rtp_port(smsg.headers["call-id"]) self.on_call( CallInfo( caller_endpoint=caller_endpoint, caller_rtp_port=remote_rtp_port, server_ip=remote_rtp_ip, - headers=headers, + headers=smsg.headers, opus_payload_type=opus_payload_type, # Should probably update this to eventually support more codecs local_rtp_ip=local_endpoint.host, local_rtp_port=local_rtp_port, @@ -419,39 +469,19 @@ def datagram_received(self, data: bytes, addr): return # Acknowledge the BYE message, otherwise the phone will keep sending it - ( - protocol, - code, - reason, - headers, - body, - ) = self._parse_sip_reply(message) - _LOGGER.debug( - "Parsed response protocol=%s code=%s reason=%s headers=[%s] body=[%s]", - protocol, - code, - reason, - headers, - body, - ) - rtp_info = get_rtp_info(body) + rtp_info = get_rtp_info(smsg.body) remote_rtp_port = rtp_info.rtp_port opus_payload_type = rtp_info.payload_type - via_header = headers["via"] - from_header = headers["from"] - to_header = headers["to"] - callid_header = headers["call-id"] - cseq_header = headers["cseq"] # We should remove the call from the outgoing calls dict now if it is there - self._end_outgoing_call(callid_header) + self._end_outgoing_call(smsg.headers["call-id"]) ok_lines = [ "SIP/2.0 200 OK", - f"Via: {via_header}", - f"From: {from_header}", - f"To: {to_header}", - f"Call-ID: {callid_header}", - f"CSeq: {cseq_header}", - "User-Agent: test-agent 1.0", + f"Via: {smsg.headers['via']}", + f"From: {smsg.headers['from']}", + f"To: {smsg.headers['to']}", + f"Call-ID: {smsg.headers['call-id']}", + f"CSeq: {smsg.headers['cseq']}", + f"User-Agent: {VOIP_UTILS_AGENT} 1.0", "Content-Length: 0", ] ok_text = _CRLF.join(ok_lines) + _CRLF @@ -530,68 +560,6 @@ def answer( server_rtp_port, ) - def _parse_sip( - self, message: str - ) -> Tuple[Optional[str], Optional[str], Dict[str, str], str]: - """Parse SIP message and return method, headers, and body.""" - lines = message.splitlines() - - method: Optional[str] = None - ruri: Optional[str] = None - headers: dict[str, str] = {} - offset: int = 0 - - # See: https://datatracker.ietf.org/doc/html/rfc3261 - for i, line in enumerate(lines): - if line: - offset += len(line) + len(_CRLF) - - if i == 0: - line_parts = line.split() - method = line_parts[0] - ruri = line_parts[1] - elif not line: - break - else: - key, value = line.split(":", maxsplit=1) - headers[key.lower()] = value.strip() - - body = message[offset:] - - return method, ruri, headers, body - - def _parse_sip_reply( - self, message: str - ) -> Tuple[Optional[str], Optional[str], Optional[str], Dict[str, str], str]: - """Parse SIP message and return method, headers, and body.""" - lines = message.splitlines() - - protocol: Optional[str] = None - code: Optional[str] = None - reason: Optional[str] = None - headers: dict[str, str] = {} - offset: int = 0 - - # See: https://datatracker.ietf.org/doc/html/rfc3261 - for i, line in enumerate(lines): - if line: - offset += len(line) + len(_CRLF) - - if i == 0: - line_parts = line.split() - protocol = line_parts[0] - code = line_parts[1] - reason = line_parts[2] - elif not line: - break - else: - key, value = line.split(":", maxsplit=1) - headers[key.lower()] = value.strip() - - body = message[offset:] - - return protocol, code, reason, headers, body - class CallPhoneDatagramProtocol(asyncio.DatagramProtocol, ABC): def __init__( @@ -644,7 +612,7 @@ def connection_made(self, transport): f"To: {self._dest_endpoint.sip_header}", f"Call-ID: {self._call_id}", "CSeq: 50 INVITE", - "User-Agent: test-agent 1.0", + f"User-Agent: {VOIP_UTILS_AGENT} 1.0", "Allow: INVITE, ACK, OPTIONS, CANCEL, BYE, SUBSCRIBE, NOTIFY, INFO, REFER, UPDATE", "Accept: application/sdp, application/dtmf-relay", "Content-Type: application/sdp", @@ -686,19 +654,6 @@ def datagram_received(self, data: bytes, addr): _LOGGER.debug( "Got 401 Unauthorized response, should attempt authentication here..." ) - # register_lines = [ - # f"REGISTER {self._dest_endpoint.uri} SIP/2.0", - # f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", - # f"From: {self._source_endpoint.sip_header}", - # f"Contact: {self._source_endpoint.sip_header}", - # f"To: {self._dest_endpoint.sip_header}", - # f"Call-ID: {self._call_id}", - # "CSeq: 51 REGISTER", - # "Authorization: ", - # "User-Agent: test-agent 1.0", - # "Allow: INVITE, ACK, OPTIONS, CANCEL, BYE, SUBSCRIBE, NOTIFY, INFO, REFER, UPDATE", - # "", - # ] elif _version == "BYE": _LOGGER.debug("Received BYE message: %s", line) if self.transport is None: @@ -724,19 +679,14 @@ def datagram_received(self, data: bytes, addr): rtp_info = get_rtp_info(body) remote_rtp_port = rtp_info.rtp_port opus_payload_type = rtp_info.payload_type - via_header = headers["via"] - from_header = headers["from"] - to_header = headers["to"] - callid_header = headers["call-id"] - cseq_header = headers["cseq"] ok_lines = [ "SIP/2.0 200 OK", - f"Via: {via_header}", - f"From: {from_header}", - f"To: {to_header}", - f"Call-ID: {callid_header}", - f"CSeq: {cseq_header}", - "User-Agent: test-agent 1.0", + f"Via: {headers['via']}", + f"From: {headers['from']}", + f"To: {headers['to']}", + f"Call-ID: {headers['call-id']}", + f"CSeq: {headers['cseq']}", + f"User-Agent: {VOIP_UTILS_AGENT} 1.0", "Content-Length: 0", ] ok_text = _CRLF.join(ok_lines) + _CRLF @@ -772,15 +722,14 @@ def datagram_received(self, data: bytes, addr): rtp_info = get_rtp_info(body) remote_rtp_port = rtp_info.rtp_port opus_payload_type = rtp_info.payload_type - to_header = headers["to"] ack_lines = [ f"ACK {self._dest_endpoint.uri} SIP/2.0", f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", f"From: {self._source_endpoint.sip_header}", - f"To: {to_header}", + f"To: {headers['to']}", f"Call-ID: {self._call_id}", "CSeq: 50 ACK", - "User-Agent: test-agent 1.0", + f"User-Agent: {VOIP_UTILS_AGENT} 1.0", "Content-Length: 0", ] ack_text = _CRLF.join(ack_lines) + _CRLF @@ -817,7 +766,7 @@ def hang_up(self): f"To: {self._dest_endpoint.sip_header}", f"Call-ID: {self._call_id}", "CSeq: 51 BYE", - "User-Agent: test-agent 1.0", + f"User-Agent: {VOIP_UTILS_AGENT} 1.0", "Content-Length: 0", "", ] From 3ddfaa4e87270cc5bab6972483f26a1ce2d8df50 Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Sun, 12 Jan 2025 13:37:44 -0600 Subject: [PATCH 04/10] Add opuslib to dev requirements. --- requirements_dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements_dev.txt b/requirements_dev.txt index c6eafa1..0789c4b 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -4,6 +4,7 @@ black==23.3.0 flake8==7.1.0 isort==5.12.0 mypy==1.1.1 +opuslib==3.0.1 pylint==3.2.5 pytest==7.2.2 python-dotenv==1.0.1 From d6ff549481a38ed7dac24370974189a08a73dec3 Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Tue, 14 Jan 2025 21:40:06 -0600 Subject: [PATCH 05/10] Add abstract on_hangup method. The on_hangup method will be called when a BYE message is received to allow any state about the call to be cleaned up after the call has ended. --- voip_utils/sip.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index 9810bba..d23be2c 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -468,6 +468,15 @@ def datagram_received(self, data: bytes, addr): _LOGGER.debug("Skipping message: %s", message) return + # The From header should give us the URI used for sending SIP messages to the device + if smsg.headers.get("from") is not None: + caller_endpoint = SipEndpoint(smsg.headers.get("from", "")) + # We can try using the Contact header as a fallback + elif smsg.headers.get("contact") is not None: + caller_endpoint = SipEndpoint(smsg.headers.get("contact", "")) + # If all else fails try to generate a URI based on the IP and port from the address the message came from + else: + caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) # Acknowledge the BYE message, otherwise the phone will keep sending it rtp_info = get_rtp_info(smsg.body) remote_rtp_port = rtp_info.rtp_port @@ -495,6 +504,16 @@ def datagram_received(self, data: bytes, addr): # The transport might be used for incoming calls # as well, so we should leave it open. + # Cleanup any necessary call state + self.on_hangup( + CallInfo( + caller_endpoint=caller_endpoint, + caller_rtp_port=remote_rtp_port, + server_ip=remote_rtp_ip, + headers=smsg.headers, + ) + ) + except Exception: _LOGGER.exception("Unexpected error handling SIP message") @@ -502,6 +521,10 @@ def datagram_received(self, data: bytes, addr): def on_call(self, call_info: CallInfo): """Handle incoming calls.""" + @abstractmethod + def on_hangup(self, call_info: CallInfo): + """Handle the end of a call.""" + def answer( self, call_info: CallInfo, From 534ab7c169fc8a9d4020f37f887330696d5ab210 Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Fri, 17 Jan 2025 19:17:33 -0600 Subject: [PATCH 06/10] Fix remote IP --- voip_utils/sip.py | 1 + 1 file changed, 1 insertion(+) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index d23be2c..bf07015 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -479,6 +479,7 @@ def datagram_received(self, data: bytes, addr): caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) # Acknowledge the BYE message, otherwise the phone will keep sending it rtp_info = get_rtp_info(smsg.body) + remote_rtp_ip = rtp_info.rtp_ip remote_rtp_port = rtp_info.rtp_port opus_payload_type = rtp_info.payload_type # We should remove the call from the outgoing calls dict now if it is there From 547d3649e92248aa60a0091d1bd2d2f2e15cc9f6 Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Sat, 18 Jan 2025 14:57:15 -0600 Subject: [PATCH 07/10] Remove unnecessary logging --- voip_utils/sip.py | 1 - 1 file changed, 1 deletion(-) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index bf07015..6fca1c0 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -233,7 +233,6 @@ def outgoing_call( ): """Make an outgoing call from the given source endpoint to the destination endpoint, using the rtp_port for the local RTP port of the call.""" if self.transport is None: - _LOGGER.warning("No transport for outgoing VOIP calls") raise RuntimeError("No transport available for outgoing call.") session_id = str(time.monotonic_ns()) From cff70d0e7d24ce62c70179ce311c04d3e416e035 Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Sat, 18 Jan 2025 16:03:55 -0600 Subject: [PATCH 08/10] Add hangup method and remove unused classes/files --- call_example.py | 33 +++- voip_utils/call_phone.py | 101 ------------ voip_utils/sip.py | 320 +++++++-------------------------------- 3 files changed, 78 insertions(+), 376 deletions(-) delete mode 100644 voip_utils/call_phone.py diff --git a/call_example.py b/call_example.py index 4cf570a..b80e3f2 100644 --- a/call_example.py +++ b/call_example.py @@ -8,10 +8,9 @@ from dotenv import load_dotenv -from voip_utils.call_phone import VoipCallDatagramProtocol +from voip_utils.voip import VoipDatagramProtocol, CallProtocolFactory from voip_utils.sip import ( CallInfo, - CallPhoneDatagramProtocol, SdpInfo, SipEndpoint, get_sip_endpoint, @@ -52,6 +51,22 @@ def get_env_int(env_var: str, default_val: int) -> int: "sleep_ratio": 0.99, } +class ExampleVoipDatagramProtocol(VoipDatagramProtocol): + + def __init__( + self, + sdp_info: SdpInfo, + valid_protocol_factory: CallProtocolFactory, + invalid_protocol_factory: Optional[CallProtocolFactory] = None, + ) -> None: + """Set up VoIP call handler.""" + super().__init__(sdp_info, valid_protocol_factory, invalid_protocol_factory) + self.call_end = asyncio.Event() + + + def on_hangup(self, call_info: CallInfo): + """Example implementation of on hangup. """ + self.call_end.set() class PreRecordMessageProtocol(RtpDatagramProtocol): """Plays a pre-recorded message on a loop.""" @@ -153,11 +168,13 @@ async def main() -> None: pass _, protocol = await loop.create_datagram_endpoint( - lambda: VoipCallDatagramProtocol( + lambda: ExampleVoipDatagramProtocol( None, - source, - destination, - rtp_port, + lambda call_info, rtcp_state: PreRecordMessageProtocol( + "problem.pcm", + call_info.opus_payload_type, + rtcp_state=rtcp_state, + ), lambda call_info, rtcp_state: PreRecordMessageProtocol( "problem.pcm", call_info.opus_payload_type, @@ -167,7 +184,9 @@ async def main() -> None: local_addr=(CALL_SRC_IP, CALL_SRC_PORT), ) - await protocol.wait_closed() + protocol.outgoing_call(source, destination, rtp_port) + + await protocol.call_end.wait() if __name__ == "__main__": diff --git a/voip_utils/call_phone.py b/voip_utils/call_phone.py deleted file mode 100644 index 88fbc8e..0000000 --- a/voip_utils/call_phone.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import logging -from asyncio.transports import DatagramTransport -from functools import partial -from typing import Any, Callable, Optional, Set - -from .sip import CallInfo, CallPhoneDatagramProtocol, SdpInfo, SipEndpoint -from .voip import RtcpDatagramProtocol, RtcpState - -_LOGGER = logging.getLogger(__name__) - -RATE = 16000 -WIDTH = 2 -CHANNELS = 1 -RTP_AUDIO_SETTINGS = { - "rate": RATE, - "width": WIDTH, - "channels": CHANNELS, - "sleep_ratio": 0.99, -} - -CallProtocolFactory = Callable[[CallInfo, RtcpState], asyncio.DatagramProtocol] - - -class VoipCallDatagramProtocol(CallPhoneDatagramProtocol): - """UDP server for Voice over IP (VoIP).""" - - def __init__( - self, - sdp_info: SdpInfo | None, - source_endpoint: SipEndpoint, - dest_endpoint: SipEndpoint, - rtp_port: int, - call_protocol_factory: CallProtocolFactory, - ) -> None: - """Set up VoIP call handler.""" - super().__init__(sdp_info, source_endpoint, dest_endpoint, rtp_port) - self.call_protocol_factory = call_protocol_factory - self._tasks: Set[asyncio.Future[Any]] = set() - self._rtp_transport: Optional[DatagramTransport] = None - self._rtpc_transport: Optional[DatagramTransport] = None - - def on_call(self, call_info: CallInfo): - """Answer incoming calls and start RTP server on specified port.""" - - rtp_ip = self._source_endpoint.host - - _LOGGER.debug( - "Starting RTP server on ip=%s, rtp_port=%s, rtcp_port=%s", - rtp_ip, - self._rtp_port, - self._rtp_port + 1, - ) - - # Handle RTP packets in RTP server - rtp_task = asyncio.create_task( - self._create_rtp_server( - self.call_protocol_factory, call_info, rtp_ip, self._rtp_port - ) - ) - self._tasks.add(rtp_task) - rtp_task.add_done_callback(self._tasks.remove) - - _LOGGER.debug("RTP server started") - - def call_cleanup(self): - _LOGGER.debug("Closing RTP/C servers for end of call") - if self._rtp_transport is not None: - self._rtp_transport.close() - self._rtp_transport = None - if self._rtpc_transport is not None: - self._rtpc_transport.close() - self._rtpc_transport = None - - def end_call(self, task): - """Callback for hanging up when call is ended.""" - self.hang_up() - - async def _create_rtp_server( - self, - protocol_factory: CallProtocolFactory, - call_info: CallInfo, - rtp_ip: str, - rtp_port: int, - ): - # Shared state between RTP/RTCP servers - rtcp_state = RtcpState() - - loop = asyncio.get_running_loop() - - # RTCP server - self._rtpc_transport, _ = await loop.create_datagram_endpoint( - lambda: RtcpDatagramProtocol(rtcp_state), - (rtp_ip, rtp_port + 1), - ) - - # RTP server - self._rtp_transport, _ = await loop.create_datagram_endpoint( - partial(protocol_factory, call_info, rtcp_state), - (rtp_ip, rtp_port), - ) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index 6fca1c0..510e714 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -8,7 +8,7 @@ import time from abc import ABC, abstractmethod from dataclasses import dataclass, field -from typing import Dict, Optional, Tuple +from typing import Optional from .const import OPUS_PAYLOAD_TYPE from .error import VoipError @@ -126,6 +126,7 @@ class CallInfo: """Information gathered from an INVITE message.""" caller_endpoint: SipEndpoint + local_endpoint: SipEndpoint caller_rtp_port: int server_ip: str headers: dict[str, str] @@ -288,6 +289,32 @@ def outgoing_call( (destination.host, destination.port), ) + def hang_up(self, call_info: CallInfo): + """Hang up the call when finished""" + if self.transport is None: + raise RuntimeError("No transport available for sending hangup.") + + bye_lines = [ + f"BYE {call_info.caller_endpoint.uri} SIP/2.0", + f"Via: SIP/2.0/UDP {call_info.local_endpoint.host}:{call_info.local_endpoint.port}", + f"From: {call_info.local_endpoint.sip_header}", + f"To: {call_info.caller_endpoint.sip_header}", + f"Call-ID: {call_info.headers['call-id']}", + "CSeq: 51 BYE", + f"User-Agent: {VOIP_UTILS_AGENT} 1.0", + "Content-Length: 0", + "", + ] + _LOGGER.debug("Hanging up...") + bye_text = _CRLF.join(bye_lines) + _CRLF + bye_bytes = bye_text.encode("utf-8") + self.transport.sendto( + bye_bytes, (call_info.caller_endpoint.host, call_info.caller_endpoint.port) + ) + + self._end_outgoing_call(call_info.headers["call-id"]) + self.on_hangup(call_info) + def _register_outgoing_call(self, call_id: str, rtp_port: int): """Register the RTP port associated with an outgoing call.""" self._outgoing_calls[call_id] = rtp_port @@ -307,6 +334,10 @@ def connection_made(self, transport): def datagram_received(self, data: bytes, addr): """Handle INVITE SIP messages.""" try: + if self.transport is None: + _LOGGER.warning("No transport for exchanging SIP message") + return + caller_ip, caller_sip_port = addr message = data.decode("utf-8") smsg = SipMessage.parse_sip(message) @@ -341,6 +372,13 @@ def datagram_received(self, data: bytes, addr): else: caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) + local_endpoint = None + if smsg.headers.get("to") is not None: + local_endpoint = SipEndpoint(smsg.headers.get("to", "")) + else: + local_ip, local_port = self.transport.get_extra_info("sockname") + local_endpoint = get_sip_endpoint(local_ip, port=local_port) + _LOGGER.debug("Incoming call from endpoint=%s", caller_endpoint) # Extract caller's RTP port from SDP. @@ -397,6 +435,7 @@ def datagram_received(self, data: bytes, addr): self.on_call( CallInfo( caller_endpoint=caller_endpoint, + local_endpoint=local_endpoint, caller_rtp_port=caller_rtp_port, server_ip=server_ip, headers=smsg.headers, @@ -413,9 +452,6 @@ def datagram_received(self, data: bytes, addr): return _LOGGER.debug("Got OK message") - if self.transport is None: - _LOGGER.warning("No transport for exchanging SIP message") - return rtp_info = get_rtp_info(smsg.body) remote_rtp_ip = rtp_info.rtp_ip remote_rtp_port = rtp_info.rtp_port @@ -452,6 +488,7 @@ def datagram_received(self, data: bytes, addr): self.on_call( CallInfo( caller_endpoint=caller_endpoint, + local_endpoint=local_endpoint, caller_rtp_port=remote_rtp_port, server_ip=remote_rtp_ip, headers=smsg.headers, @@ -476,6 +513,16 @@ def datagram_received(self, data: bytes, addr): # If all else fails try to generate a URI based on the IP and port from the address the message came from else: caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) + + local_endpoint = None + if smsg.headers.get("to") is not None: + local_endpoint = SipEndpoint(smsg.headers.get("to", "")) + else: + local_ip, local_port = self.transport.get_extra_info("sockname") + local_endpoint = get_sip_endpoint(local_ip, port=local_port) + + _LOGGER.debug("Incoming BYE from endpoint=%s", caller_endpoint) + # Acknowledge the BYE message, otherwise the phone will keep sending it rtp_info = get_rtp_info(smsg.body) remote_rtp_ip = rtp_info.rtp_ip @@ -508,6 +555,7 @@ def datagram_received(self, data: bytes, addr): self.on_hangup( CallInfo( caller_endpoint=caller_endpoint, + local_endpoint=local_endpoint, caller_rtp_port=remote_rtp_port, server_ip=remote_rtp_ip, headers=smsg.headers, @@ -582,267 +630,3 @@ def answer( call_info.caller_sip_port, server_rtp_port, ) - - -class CallPhoneDatagramProtocol(asyncio.DatagramProtocol, ABC): - def __init__( - self, - sdp_info: SdpInfo | None, - source: SipEndpoint, - dest: SipEndpoint, - rtp_port: int, - ) -> None: - self.sdp_info = sdp_info - self.transport = None - self._closed_event = asyncio.Event() - self._loop = asyncio.get_running_loop() - self._session_id = str(time.monotonic_ns()) - self._session_version = self._session_id - self._call_id = self._session_id - self._source_endpoint = source - self._dest_endpoint = dest - self._rtp_port = rtp_port - - def connection_made(self, transport): - self.transport = transport - - sdp_lines = [ - "v=0", - f"o={self._source_endpoint.username} {self._session_id} {self._session_version} IN IP4 {self._source_endpoint.host}", - "s=Talk", - f"c=IN IP4 {self._source_endpoint.host}", - "t=0 0", - f"m=audio {self._rtp_port} RTP/AVP 123 96 101 103 104", - "a=sendrecv", - "a=rtpmap:96 opus/48000/2", - "a=fmtp:96 useinbandfec=0", - "a=rtpmap:123 opus/48000/2", - "a=fmtp:123 maxplaybackrate=16000", - "a=rtpmap:101 telephone-event/48000", - "a=rtpmap:103 telephone-event/16000", - "a=rtpmap:104 telephone-event/8000", - "a=ptime:20", - "", - ] - sdp_text = _CRLF.join(sdp_lines) - sdp_bytes = sdp_text.encode("utf-8") - - invite_lines = [ - f"INVITE {self._dest_endpoint.uri} SIP/2.0", - f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", - f"From: {self._source_endpoint.sip_header}", - f"Contact: {self._source_endpoint.sip_header}", - f"To: {self._dest_endpoint.sip_header}", - f"Call-ID: {self._call_id}", - "CSeq: 50 INVITE", - f"User-Agent: {VOIP_UTILS_AGENT} 1.0", - "Allow: INVITE, ACK, OPTIONS, CANCEL, BYE, SUBSCRIBE, NOTIFY, INFO, REFER, UPDATE", - "Accept: application/sdp, application/dtmf-relay", - "Content-Type: application/sdp", - f"Content-Length: {len(sdp_bytes)}", - "", - ] - invite_text = _CRLF.join(invite_lines) + _CRLF - invite_bytes = invite_text.encode("utf-8") - - _LOGGER.debug(invite_bytes + sdp_bytes) - - self.transport.sendto( - invite_bytes + sdp_bytes, - (self._dest_endpoint.host, self._dest_endpoint.port), - ) - - def datagram_received(self, data: bytes, addr): - response_text = data.decode("utf-8") - response_lines = response_text.splitlines() - _LOGGER.debug(response_lines) - is_ok = False - - for i, line in enumerate(response_lines): - line = line.strip() - if not line: - break - if i > 0: - continue - _version, code, response_type = line.split(maxsplit=2) - _LOGGER.debug( - "Version=%s, Code=%s, response_type=%s", - _version, - code, - response_type, - ) - if (code == "200") and (response_type == "OK"): - is_ok = True - elif code == "401": - _LOGGER.debug( - "Got 401 Unauthorized response, should attempt authentication here..." - ) - elif _version == "BYE": - _LOGGER.debug("Received BYE message: %s", line) - if self.transport is None: - _LOGGER.debug("Skipping message: %s", line) - continue - - # Acknowledge the BYE message, otherwise the phone will keep sending it - ( - protocol, - code, - reason, - headers, - body, - ) = self._parse_sip_reply(response_text) - _LOGGER.debug( - "Parsed response protocol=%s code=%s reason=%s headers=[%s] body=[%s]", - protocol, - code, - reason, - headers, - body, - ) - rtp_info = get_rtp_info(body) - remote_rtp_port = rtp_info.rtp_port - opus_payload_type = rtp_info.payload_type - ok_lines = [ - "SIP/2.0 200 OK", - f"Via: {headers['via']}", - f"From: {headers['from']}", - f"To: {headers['to']}", - f"Call-ID: {headers['call-id']}", - f"CSeq: {headers['cseq']}", - f"User-Agent: {VOIP_UTILS_AGENT} 1.0", - "Content-Length: 0", - ] - ok_text = _CRLF.join(ok_lines) + _CRLF - ok_bytes = ok_text.encode("utf-8") - # We should probably tell the associated RTP server to shutdown at this point, assuming we aren't reusing it for other calls - _LOGGER.debug("Sending OK for BYE message: %s", ok_text) - self.transport.sendto( - ok_bytes, - (self._dest_endpoint.host, self._dest_endpoint.port), - ) - - self.transport.close() - self.transport = None - - if not is_ok: - _LOGGER.debug("Received non-OK response [%s]", response_text) - return - - _LOGGER.debug("Got OK message") - if self.transport is None: - _LOGGER.debug("No transport for exchanging SIP message") - return - - protocol, code, reason, headers, body = self._parse_sip_reply(response_text) - _LOGGER.debug( - "Parsed response protocol=%s code=%s reason=%s headers=[%s] body=[%s]", - protocol, - code, - reason, - headers, - body, - ) - rtp_info = get_rtp_info(body) - remote_rtp_port = rtp_info.rtp_port - opus_payload_type = rtp_info.payload_type - ack_lines = [ - f"ACK {self._dest_endpoint.uri} SIP/2.0", - f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", - f"From: {self._source_endpoint.sip_header}", - f"To: {headers['to']}", - f"Call-ID: {self._call_id}", - "CSeq: 50 ACK", - f"User-Agent: {VOIP_UTILS_AGENT} 1.0", - "Content-Length: 0", - ] - ack_text = _CRLF.join(ack_lines) + _CRLF - ack_bytes = ack_text.encode("utf-8") - self.transport.sendto( - ack_bytes, (self._dest_endpoint.host, self._dest_endpoint.port) - ) - - # The call been answered, proceed with desired action here - self.on_call( - CallInfo( - caller_endpoint=self._dest_endpoint, - caller_rtp_port=remote_rtp_port, - server_ip=self._dest_endpoint.host, - headers=headers, - opus_payload_type=opus_payload_type, # Should probably update this to eventually support more codecs - ) - ) - - @abstractmethod - def on_call(self, call_info: CallInfo): - """Handle outgoing calls.""" - - @abstractmethod - def call_cleanup(self): - """Handle cleanup after ending call.""" - - def hang_up(self): - """Hang up the call when finished""" - bye_lines = [ - f"BYE {self._dest_endpoint.uri} SIP/2.0", - f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", - f"From: {self._source_endpoint.sip_header}", - f"To: {self._dest_endpoint.sip_header}", - f"Call-ID: {self._call_id}", - "CSeq: 51 BYE", - f"User-Agent: {VOIP_UTILS_AGENT} 1.0", - "Content-Length: 0", - "", - ] - _LOGGER.debug("Hanging up...") - bye_text = _CRLF.join(bye_lines) + _CRLF - bye_bytes = bye_text.encode("utf-8") - self.transport.sendto( - bye_bytes, (self._dest_endpoint.host, self._dest_endpoint.port) - ) - - self.call_cleanup() - - self.transport.close() - self.transport = None - - def connection_lost(self, exc): - """Signal wait_closed when transport is completely closed.""" - _LOGGER.debug("Connection lost") - self._closed_event.set() - self.call_cleanup() - - async def wait_closed(self) -> None: - """Wait for connection_lost to be called.""" - await self._closed_event.wait() - - def _parse_sip_reply( - self, message: str - ) -> Tuple[Optional[str], Optional[str], Optional[str], Dict[str, str], str]: - """Parse SIP message and return method, headers, and body.""" - lines = message.splitlines() - - protocol: Optional[str] = None - code: Optional[str] = None - reason: Optional[str] = None - headers: dict[str, str] = {} - offset: int = 0 - - # See: https://datatracker.ietf.org/doc/html/rfc3261 - for i, line in enumerate(lines): - if line: - offset += len(line) + len(_CRLF) - - if i == 0: - line_parts = line.split() - protocol = line_parts[0] - code = line_parts[1] - reason = line_parts[2] - elif not line: - break - else: - key, value = line.split(":", maxsplit=1) - headers[key.lower()] = value.strip() - - body = message[offset:] - - return protocol, code, reason, headers, body From 1f730113fe0377d603d47d804a24281b0b7f9c81 Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Sun, 19 Jan 2025 20:26:26 -0600 Subject: [PATCH 09/10] Fix datagram handling for invite and non-invite responses --- voip_utils/sip.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index 510e714..d72e56b 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -452,6 +452,12 @@ def datagram_received(self, data: bytes, addr): return _LOGGER.debug("Got OK message") + if not self._is_response_type(smsg, "invite"): + # This will happen if/when we hang up. + _LOGGER.debug("Got response for non-invite message") + return + + _LOGGER.debug("Got invite response") rtp_info = get_rtp_info(smsg.body) remote_rtp_ip = rtp_info.rtp_ip remote_rtp_port = rtp_info.rtp_port @@ -573,6 +579,14 @@ def on_call(self, call_info: CallInfo): def on_hangup(self, call_info: CallInfo): """Handle the end of a call.""" + def _is_response_type(self, msg: SipMessage, resp_type: str) -> bool: + """Return whether or not the response message is for the given type.""" + return ( + msg is not None + and "cseq" in msg.headers + and resp_type.lower() in msg.headers["cseq"].lower() + ) + def answer( self, call_info: CallInfo, From 8c2429b22c8537ab71174bbcfa7bb87691266098 Mon Sep 17 00:00:00 2001 From: Jamin Hitchcock Date: Sat, 25 Jan 2025 08:17:02 -0600 Subject: [PATCH 10/10] Don't require on_hangup implementation --- voip_utils/sip.py | 1 - 1 file changed, 1 deletion(-) diff --git a/voip_utils/sip.py b/voip_utils/sip.py index d72e56b..6e2c8e4 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -575,7 +575,6 @@ def datagram_received(self, data: bytes, addr): def on_call(self, call_info: CallInfo): """Handle incoming calls.""" - @abstractmethod def on_hangup(self, call_info: CallInfo): """Handle the end of a call."""