Skip to content

Commit

Permalink
Merge pull request #28 from home-assistant-libs/synesthesiam-20250128…
Browse files Browse the repository at this point in the history
…-cancel-call

Add cancel_call
  • Loading branch information
synesthesiam authored Jan 29, 2025
2 parents ea326a4 + dfd5d16 commit 1e9a028
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.3.1

- Add cancel_call to stop ringing

## 0.3.0

- Add support for outgoing calls (@jaminh)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "voip-utils"
version = "0.3.0"
version = "0.3.1"
license = {text = "Apache-2.0"}
description = "Voice over IP Utilities"
readme = "README.md"
Expand Down
37 changes: 36 additions & 1 deletion voip_utils/sip.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def __init__(self, sdp_info: SdpInfo) -> None:

def outgoing_call(
self, source: SipEndpoint, destination: SipEndpoint, rtp_port: int
):
) -> CallInfo:
"""Make an outgoing call from the given source endpoint to the destination endpoint, using the rtp_port for the local RTP port of the call."""
if self.transport is None:
raise RuntimeError("No transport available for outgoing call.")
Expand Down Expand Up @@ -289,6 +289,14 @@ def outgoing_call(
(destination.host, destination.port),
)

return CallInfo(
caller_endpoint=destination,
local_endpoint=source,
caller_rtp_port=rtp_port,
server_ip=source.host,
headers={"call-id": call_id},
)

def hang_up(self, call_info: CallInfo):
"""Hang up the call when finished"""
if self.transport is None:
Expand All @@ -315,6 +323,33 @@ def hang_up(self, call_info: CallInfo):
self._end_outgoing_call(call_info.headers["call-id"])
self.on_hangup(call_info)

def cancel_call(self, call_info: CallInfo):
"""Cancel an outgoing call while it's still ringing."""
if self.transport is None:
raise RuntimeError("No transport available for sending cancel.")

cancel_lines = [
f"CANCEL {call_info.caller_endpoint.uri} SIP/2.0",
f"Via: SIP/2.0/UDP {call_info.local_endpoint.host}:{call_info.local_endpoint.port}",
f"From: {call_info.local_endpoint.sip_header}",
f"To: {call_info.caller_endpoint.sip_header}",
f"Call-ID: {call_info.headers['call-id']}",
"CSeq: 51 CANCEL",
f"User-Agent: {VOIP_UTILS_AGENT} 1.0",
"Content-Length: 0",
"",
]
_LOGGER.debug("Canceling call...")
cancel_text = _CRLF.join(cancel_lines) + _CRLF
cancel_bytes = cancel_text.encode("utf-8")
self.transport.sendto(
cancel_bytes,
(call_info.caller_endpoint.host, call_info.caller_endpoint.port),
)

self._end_outgoing_call(call_info.headers["call-id"])
self.on_hangup(call_info)

def _register_outgoing_call(self, call_id: str, rtp_port: int):
"""Register the RTP port associated with an outgoing call."""
self._outgoing_calls[call_id] = rtp_port
Expand Down

0 comments on commit 1e9a028

Please sign in to comment.