diff --git a/.gitignore b/.gitignore index 4398532..dd2703b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ tmp/ htmlcov .projectile +.env .venv/ venv/ .mypy_cache/ diff --git a/README.md b/README.md index e38263f..227d27c 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,19 @@ # VoIP Utils Voice over IP utilities for the [voip integration](https://www.home-assistant.io/integrations/voip/). + +## Test outgoing call +Install dependencies from requirements_dev.txt + +Set environment variables for source and destination endpoints in .env file + CALL_SRC_USER = "homeassistant" + CALL_SRC_IP = "192.168.1.1" + CALL_SRC_PORT = 5060 + CALL_VIA_IP = "192.168.1.1" + CALL_DEST_IP = "192.168.1.2" + CALL_DEST_PORT = 5060 + CALL_DEST_USER = "phone" + +Run script +python -m voip_utils.call_phone + diff --git a/problem.pcm b/problem.pcm new file mode 100644 index 0000000..8873766 Binary files /dev/null and b/problem.pcm differ diff --git a/requirements_dev.txt b/requirements_dev.txt index d9522fb..c6eafa1 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -6,3 +6,4 @@ isort==5.12.0 mypy==1.1.1 pylint==3.2.5 pytest==7.2.2 +python-dotenv==1.0.1 diff --git a/voip_utils/call_phone.py b/voip_utils/call_phone.py index 74b6910..8febc7f 100644 --- a/voip_utils/call_phone.py +++ b/voip_utils/call_phone.py @@ -1,16 +1,226 @@ import asyncio import logging +import os +import socket +from functools import partial +from pathlib import Path +from typing import Any, Callable, Optional, Set -from .sip import CallPhoneDatagramProtocol, CALL_SRC_IP +from dotenv import load_dotenv + +from .sip import CallInfo, CallPhoneDatagramProtocol, SdpInfo, SipEndpoint +from .voip import RtcpDatagramProtocol, RtcpState, RtpDatagramProtocol + +_LOGGER = logging.getLogger(__name__) + +load_dotenv() + +CALL_SRC_USER = os.getenv("CALL_SRC_USER") +CALL_SRC_IP = os.getenv("CALL_SRC_IP") +CALL_SRC_PORT = int(os.getenv("CALL_SRC_PORT")) +CALL_VIA_IP = os.getenv("CALL_VIA_IP") +CALL_DEST_IP = os.getenv("CALL_DEST_IP") +CALL_DEST_PORT = int(os.getenv("CALL_DEST_PORT")) +CALL_DEST_USER = os.getenv("CALL_DEST_USER") + + +RATE = 16000 +WIDTH = 2 +CHANNELS = 1 +RTP_AUDIO_SETTINGS = { + "rate": RATE, + "width": WIDTH, + "channels": CHANNELS, + "sleep_ratio": 0.99, +} + +CallProtocolFactory = Callable[[CallInfo, RtcpState], asyncio.Protocol] + + +class VoipCallDatagramProtocol(CallPhoneDatagramProtocol): + """UDP server for Voice over IP (VoIP).""" + + def __init__( + self, + sdp_info: SdpInfo, + source_endpoint: SipEndpoint, + dest_endpoint: SipEndpoint, + rtp_port: int, + call_protocol_factory: CallProtocolFactory, + ) -> None: + """Set up VoIP call handler.""" + super().__init__(sdp_info, source_endpoint, dest_endpoint, rtp_port) + self.call_protocol_factory = call_protocol_factory + self._tasks: Set[asyncio.Future[Any]] = set() + + def on_call(self, call_info: CallInfo): + """Answer incoming calls and start RTP server on a random port.""" + + rtp_ip = self._source_endpoint.host + + _LOGGER.debug( + "Starting RTP server on ip=%s, rtp_port=%s, rtcp_port=%s", + rtp_ip, + self._rtp_port, + self._rtp_port + 1, + ) + + # Handle RTP packets in RTP server + rtp_task = asyncio.create_task( + self._create_rtp_server( + self.call_protocol_factory, call_info, rtp_ip, self._rtp_port + ) + ) + self._tasks.add(rtp_task) + rtp_task.add_done_callback(self._tasks.remove) + + _LOGGER.debug("RTP server started") + + def end_call(self, task): + """Callback for hanging up when call is ended.""" + self.hang_up() + + async def _create_rtp_server( + self, + protocol_factory: CallProtocolFactory, + call_info: CallInfo, + rtp_ip: str, + rtp_port: int, + ): + # Shared state between RTP/RTCP servers + rtcp_state = RtcpState() + + loop = asyncio.get_running_loop() + + # RTCP server + await loop.create_datagram_endpoint( + lambda: RtcpDatagramProtocol(rtcp_state), + (rtp_ip, rtp_port + 1), + ) + + # RTP server + await loop.create_datagram_endpoint( + partial(protocol_factory, call_info, rtcp_state), + (rtp_ip, rtp_port), + ) + + +class PreRecordMessageProtocol(RtpDatagramProtocol): + """Plays a pre-recorded message on a loop.""" + + def __init__( + self, + file_name: str, + opus_payload_type: int, + message_delay: float = 1.0, + loop_delay: float = 2.0, + loop: Optional[asyncio.AbstractEventLoop] = None, + rtcp_state: RtcpState | None = None, + ) -> None: + """Set up RTP server.""" + super().__init__( + rate=RATE, + width=WIDTH, + channels=CHANNELS, + opus_payload_type=opus_payload_type, + rtcp_state=rtcp_state, + ) + self.loop = loop + self.file_name = file_name + self.message_delay = message_delay + self.loop_delay = loop_delay + self._audio_task: asyncio.Task | None = None + self._audio_bytes: bytes | None = None + _LOGGER.debug("Created PreRecordMessageProtocol") + + def on_chunk(self, audio_bytes: bytes) -> None: + """Handle raw audio chunk.""" + _LOGGER.debug("on_chunk") + if self.transport is None: + return + + if self._audio_bytes is None: + # 16Khz, 16-bit mono audio message + file_path = Path(__file__).parent / self.file_name + self._audio_bytes = file_path.read_bytes() + + if self._audio_task is None: + self._audio_task = self.loop.create_task( + self._play_message(), + name="voip_not_connected", + ) + + async def _play_message(self) -> None: + _LOGGER.debug("_play_message") + self.send_audio( + self._audio_bytes, + self.rate, + self.width, + self.channels, + self.addr, + silence_before=self.message_delay, + ) + + await asyncio.sleep(self.loop_delay) + + # Allow message to play again - Only play once for testing + # self._audio_task = None async def main() -> None: logging.basicConfig(level=logging.DEBUG) loop = asyncio.get_event_loop() + source = SipEndpoint( + username=CALL_SRC_USER, host=CALL_SRC_IP, port=CALL_SRC_PORT, description=None + ) + destination = SipEndpoint( + username=CALL_DEST_USER, + host=CALL_DEST_IP, + port=CALL_DEST_PORT, + description=None, + ) + + # Find free RTP/RTCP ports + rtp_ip = "" + rtp_port = 0 + + while True: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setblocking(False) + + # Bind to a random UDP port + sock.bind(("", 0)) + rtp_ip, rtp_port = sock.getsockname() + + # Close socket to free port for re-use + sock.close() + + # Check that the next port up is available for RTCP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.bind(("", rtp_port + 1)) + + # Will be opened again below + sock.close() + + # Found our ports + break + except OSError: + # RTCP port is taken + pass + transport, protocol = await loop.create_datagram_endpoint( - lambda: CallPhoneDatagramProtocol(None), - local_addr=(CALL_SRC_IP, 5060), + lambda: VoipCallDatagramProtocol( + None, + source, + destination, + rtp_port, + lambda call_info, rtcp_state: PreRecordMessageProtocol( + "problem.pcm", 96, loop=loop, rtcp_state=rtcp_state + ), + ), + local_addr=(CALL_SRC_IP, CALL_SRC_PORT), ) await protocol.wait_closed() diff --git a/voip_utils/problem.pcm b/voip_utils/problem.pcm new file mode 100644 index 0000000..8873766 Binary files /dev/null and b/voip_utils/problem.pcm differ diff --git a/voip_utils/sip.py b/voip_utils/sip.py index 1a43c44..74eb9e4 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -49,6 +49,53 @@ def caller_rtcp_port(self) -> int: return self.caller_rtp_port + 1 +@dataclass +class SipEndpoint: + """Information about a SIP endpoint.""" + + host: str + port: int + username: str + description: str + + @property + def sip_uri(self) -> str: + """Return the URI for the SIP endpoint.""" + if self.username is not None: + return f"sip:{self.username}@{self.host}:{self.port}" + else: + return f"sip:{self.host}:{self.port}" + + +def get_rtp_port(body: str) -> Tuple[Optional[int], Optional[int]]: + body_lines = body.splitlines() + rtp_port = None + opus_payload_type = None + for line in body_lines: + line = line.strip() + if not line: + continue + + key, _, value = line.partition("=") + if key == "m": + parts = value.split() + if parts[0] == "audio": + rtp_port = int(parts[1]) + elif key == "a" and value.startswith("rtpmap:"): + # a=rtpmap:123 opus/48000/2 + codec_str = value.split(":", maxsplit=1)[1] + codec_parts = codec_str.split() + if (len(codec_parts) > 1) and ( + codec_parts[1].lower().startswith("opus") + ): + opus_payload_type = int(codec_parts[0]) + _LOGGER.debug( + "Detected OPUS payload type as %s", opus_payload_type + ) + + return rtp_port, opus_payload_type + + class SipDatagramProtocol(asyncio.DatagramProtocol, ABC): """UDP server for the Session Initiation Protocol (SIP).""" @@ -286,13 +333,14 @@ def _parse_sip( return method, ruri, headers, body -CALL_SRC_IP = "192.168.1.100" -CALL_VIA_IP = "192.168.1.101" -CALL_DEST_IP = "192.168.1.102" - class CallPhoneDatagramProtocol(asyncio.DatagramProtocol, ABC): def __init__( - self, sdp_info: SdpInfo, loop: Optional[asyncio.AbstractEventLoop] = None + self, + sdp_info: SdpInfo, + source: SipEndpoint, + dest: SipEndpoint, + rtp_port: int, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self.sdp_info = sdp_info self.transport = None @@ -301,23 +349,29 @@ def __init__( self._session_id = str(time.monotonic_ns()) self._session_version = str(time.monotonic_ns()) self._call_id = str(time.monotonic_ns()) - self._request_uri = f"sip:user@{CALL_SRC_IP}" + self._source_endpoint = source + self._dest_endpoint = dest + self._rtp_port = rtp_port + self._request_uri = f"sip:{dest.username}@{dest.host}:{dest.port}" def connection_made(self, transport): self.transport = transport - username = "test2" - sdp_lines = [ "v=0", - f"o={username} {self._session_id} {self._session_version} IN IP4 {CALL_SRC_IP}", - "s=SIP Call", - f"c=IN IP4 {CALL_SRC_IP}", + f"o={self._source_endpoint.username} {self._session_id} {self._session_version} IN IP4 {self._source_endpoint.host}", + "s=Talk", + f"c=IN IP4 {self._source_endpoint.host}", "t=0 0", - "m=audio 5004 RTP/AVP 123", + f"m=audio {self._rtp_port} RTP/AVP 0 96 101 103 104 123", "a=sendrecv", + "a=rtpmap:96 opus/48000/2", + "a=fmtp:96 useinbandfec=0", "a=rtpmap:123 opus/48000/2", "a=fmtp:123 maxplaybackrate=16000", + "a=rtpmap:101 telephone-event/48000", + "a=rtpmap:103 telephone-event/16000", + "a=rtpmap:104 telephone-event/8000", "a=ptime:20", "", ] @@ -326,9 +380,10 @@ def connection_made(self, transport): invite_lines = [ f"INVITE {self._request_uri} SIP/2.0", - f"Via: SIP/2.0/UDP {CALL_VIA_IP}", - f"From: ", - f"To: ", + f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", + f"From: ", + f"Contact: ", + f"To: ", f"Call-ID: {self._call_id}", "CSeq: 50 INVITE", "User-Agent: test-agent 1.0", @@ -345,7 +400,7 @@ def connection_made(self, transport): self.transport.sendto( invite_bytes + sdp_bytes, - (CALL_DEST_IP, 5060), + (self._dest_endpoint.host, self._dest_endpoint.port), ) def datagram_received(self, data: bytes, addr): @@ -359,36 +414,157 @@ def datagram_received(self, data: bytes, addr): line = line.strip() if i == 0: _version, code, response_type = line.split(maxsplit=2) + _LOGGER.debug( + "Version=%s, Code=%s, response_type=%s", + _version, + code, + response_type, + ) if (code == "200") and (response_type == "OK"): is_ok = True - else: - _LOGGER.debug("Skipping message: %s", line) + elif code == "401": + _LOGGER.debug( + "Got 401 Unauthorized response, should attempt authentication here..." + ) + #register_lines = [ + # f"REGISTER {self._request_uri} SIP/2.0", + # f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", + # f"From: ", + # f"Contact: ", + # f"To: ", + # f"Call-ID: {self._call_id}", + # "CSeq: 51 REGISTER", + # "Authorization: ", + # "User-Agent: test-agent 1.0", + # "Allow: INVITE, ACK, OPTIONS, CANCEL, BYE, SUBSCRIBE, NOTIFY, INFO, REFER, UPDATE", + # "", + #] + elif _version == "BYE": + _LOGGER.debug("Received BYE message: %s", line) + if self.transport is not None: + # Acknowledge the BYE message, otherwise the phone will keep sending it + ( + protocol, + code, + reason, + headers, + body, + ) = self._parse_sip_reply(response_text) + _LOGGER.debug( + "Parsed response protocol=%s code=%s reason=%s headers=[%s] body=[%s]", + protocol, + code, + reason, + headers, + body, + ) + remote_rtp_port, opus_payload_type = get_rtp_port(body) + via_header = headers["via"] + from_header = headers["from"] + to_header = headers["to"] + callid_header = headers["call-id"] + cseq_header = headers["cseq"] + ok_lines = [ + "SIP/2.0 200 OK", + f"Via: {via_header}", + f"From: {from_header}", + f"To: {to_header}", + f"Call-ID: {callid_header}", + f"CSeq: {cseq_header}", + "User-Agent: test-agent 1.0", + "Content-Length: 0", + ] + ok_text = _CRLF.join(ok_lines) + _CRLF + ok_bytes = ok_text.encode("utf-8") + # We should probably tell the associated RTP server to shutdown at this point, assuming we aren't reusing it for other calls + _LOGGER.debug("Sending OK for BYE message: %s", ok_text) + self.transport.sendto( + ok_bytes, + (self._dest_endpoint.host, self._dest_endpoint.port), + ) + + self.transport.close() + self.transport = None + else: + _LOGGER.debug("Skipping message: %s", line) elif not line: break if is_ok: _LOGGER.debug("Got OK message") if self.transport is not None: - bye_lines = [ - f"BYE {self._request_uri} SIP/2.0", - f"Via: SIP/2.0/UDP {CALL_VIA_IP}", - f"From: ", - f"To: ", + protocol, code, reason, headers, body = self._parse_sip_reply( + response_text + ) + _LOGGER.debug( + "Parsed response protocol=%s code=%s reason=%s headers=[%s] body=[%s]", + protocol, + code, + reason, + headers, + body, + ) + remote_rtp_port, opus_payload_type = get_rtp_port(body) + to_header = headers["to"] + ack_lines = [ + f"ACK {self._request_uri} SIP/2.0", + f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", + f"From: ", + f"To: {to_header}", f"Call-ID: {self._call_id}", - "CSeq: 51 BYE", + "CSeq: 50 ACK", "User-Agent: test-agent 1.0", "Content-Length: 0", - "", ] - bye_text = _CRLF.join(bye_lines) + _CRLF - bye_bytes = bye_text.encode("utf-8") - self.transport.sendto(bye_bytes, (CALL_DEST_IP, 5060)) - - self.transport.close() - self.transport = None + ack_text = _CRLF.join(ack_lines) + _CRLF + ack_bytes = ack_text.encode("utf-8") + self.transport.sendto( + ack_bytes, (self._dest_endpoint.host, self._dest_endpoint.port) + ) + + # The call been answered, proceed with desired action here + self.on_call( + CallInfo( + caller_ip=self._dest_endpoint.host, + caller_uri=self._dest_endpoint.sip_uri, + caller_name=self._dest_endpoint.description, + caller_sip_port=self._dest_endpoint.port, + caller_rtp_port=remote_rtp_port, + server_ip=self._dest_endpoint.host, + headers=headers, + opus_payload_type=opus_payload_type, # Should probably update this to eventually support more codecs + ) + ) except Exception: _LOGGER.exception("Unexpected error handling SIP response") + @abstractmethod + def on_call(self, call_info: CallInfo): + """Handle outgoing calls.""" + + def hang_up(self): + """Hang up the call when finished""" + bye_lines = [ + f"BYE {self._request_uri} SIP/2.0", + f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", + f"From: ", + f"To: " + f"Call-ID: {self._call_id}", + "CSeq: 51 BYE", + "User-Agent: test-agent 1.0", + "Content-Length: 0", + "", + ] + _LOGGER.debug("Hanging up...") + bye_text = _CRLF.join(bye_lines) + _CRLF + bye_bytes = bye_text.encode("utf-8") + self.transport.sendto( + bye_bytes, (self._dest_endpoint.host, self._dest_endpoint.port) + ) + + self.transport.close() + self.transport = None + def connection_lost(self, exc): """Signal wait_closed when transport is completely closed.""" self._loop.call_soon_threadsafe(self._closed_event.set) @@ -396,3 +572,35 @@ def connection_lost(self, exc): async def wait_closed(self) -> None: """Wait for connection_lost to be called.""" await self._closed_event.wait() + + def _parse_sip_reply( + self, message: str + ) -> Tuple[Optional[str], Optional[str], Optional[str], Dict[str, str], str]: + """Parse SIP message and return method, headers, and body.""" + lines = message.splitlines() + + protocol: Optional[str] = None + code: Optional[str] = None + reason: Optional[str] = None + headers: dict[str, str] = {} + offset: int = 0 + + # See: https://datatracker.ietf.org/doc/html/rfc3261 + for i, line in enumerate(lines): + if line: + offset += len(line) + len(_CRLF) + + if i == 0: + line_parts = line.split() + protocol = line_parts[0] + code = line_parts[1] + reason = line_parts[2] + elif not line: + break + else: + key, value = line.split(":", maxsplit=1) + headers[key.lower()] = value.strip() + + body = message[offset:] + + return protocol, code, reason, headers, body diff --git a/voip_utils/voip.py b/voip_utils/voip.py index 008d608..246909c 100644 --- a/voip_utils/voip.py +++ b/voip_utils/voip.py @@ -208,10 +208,12 @@ def send_audio( ) -> None: """Send audio from WAV file in chunks over RTP.""" if not self._is_connected: + _LOGGER.debug("Not connected, can't send audio") return addr = addr or self.addr if addr is None: + _LOGGER.debug("No destination address, can't send audio") raise ValueError("Destination address not set") bytes_per_sample = width * channels @@ -222,6 +224,7 @@ def send_audio( samples_left = len(audio_bytes) // bytes_per_sample rtp_packets: list[bytes] = [] while samples_left > 0: + _LOGGER.debug("Preparing audio chunk to send") bytes_offset = sample_offset * bytes_per_sample chunk = audio_bytes[bytes_offset : bytes_offset + bytes_per_frame] samples_in_chunk = len(chunk) // bytes_per_sample @@ -239,6 +242,7 @@ def send_audio( sample_offset += samples_in_chunk # Pause before sending to allow time for user to pick up phone. + _LOGGER.debug("Pause before sending") time.sleep(silence_before) # Send RTP in a steady stream, delaying between each packet to simulate real-time audio