Skip to content

Commit

Permalink
Resolves #106
Browse files Browse the repository at this point in the history
[ADD] Added REFER request generation
[ADD] Added missing type annotations
[ADD] Added support for Refer-To, Target-Dialog, and Refer-Sub headers
[ADD] Added parcing for Require header. Currently does not fail for unmet
      requirements
[ADD] Added both in and out of dialog transfer feature for phone calls
[ADD] Added tests for REFER and NOTIFY requests
[CHANGE] Made traceback on some errors better
[CHANGE] Changed __get_tfc_header to __get_uri_header
[CHANGE] __gen_uri now supports empty user string for direct ip calls
[CHANGE] Changed SIPMessage.to to a property with a doc string so it would not
         be confused with the header
  • Loading branch information
tayler6000 committed Jan 2, 2024
1 parent 5e0c100 commit 2dd3c1b
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 19 deletions.
166 changes: 159 additions & 7 deletions pyVoIP/SIP/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
InvalidAccountInfoError,
)
from pyVoIP.helpers import Counter
from pyVoIP.networking.nat import NAT
from pyVoIP.networking.nat import NAT, AddressType
from pyVoIP.SIP.message import (
SIPMessage,
SIPStatus,
Expand Down Expand Up @@ -83,6 +83,7 @@ def __init__(
self.subscribeCounter = Counter()
self.byeCounter = Counter()
self.messageCounter = Counter()
self.referCounter = Counter()
self.callID = Counter()
self.sessID = Counter()

Expand Down Expand Up @@ -232,7 +233,7 @@ def stop(self) -> None:
if self.s:
self.s.close()

def sendto(self, request: str, address=None):
def sendto(self, request: str, address=None) -> "VoIPConnection":
if address is None:
address = (self.server, self.port)
return self.s.send(request.encode("utf8"))
Expand Down Expand Up @@ -298,17 +299,16 @@ def __gen_uri(
method = method.lower()

assert method in ["sip", "sips"], "method must be sip or sips"
assert (
type(user) is str and len(user) > 0
), "User must be a non-empty string"
assert (
type(host) is str and len(host) > 0
), "User must be a non-empty string"
), "Host must be a non-empty string"

password = f":{password}" if password else ""
port_str = f":{port}" if port != 5060 else ""
params = params if params else ""
return f"{method}:{user}{password}@{host}{port_str}{params}"
if type(user) is str and len(user) > 0:
return f"{method}:{user}{password}@{host}{port_str}{params}"
return f"{method}:{host}{port_str}{params}"

def __gen_via(self, to: str, branch: str) -> str:
# SIP/2.0/ should still be the prefix even if using TLS per RFC 3261
Expand Down Expand Up @@ -833,6 +833,158 @@ def gen_invite(

return invRequest

def gen_refer(
self,
request: SIPMessage,
user: Optional[str] = None,
uri: Optional[str] = None,
blind=True,
new_dialog=True,
) -> str:
if new_dialog:
return self.__gen_refer_new_dialog(request, user, uri, blind)
return self.__gen_refer_same_dialog(request, user, uri, blind)

def __gen_refer_new_dialog(
self,
request: SIPMessage,
user: Optional[str] = None,
uri: Optional[str] = None,
blind=True,
) -> str:
if user is None and uri is None:
raise RuntimeError("Must specify a user or a URI to transfer to")
call_id = self.gen_call_id()
self.tagLibrary[call_id] = self.gen_tag()
tag = self.tagLibrary[call_id]

c = request.headers["Contact"]["uri"]
refer = f"REFER {c} SIP/2.0\r\n"
refer += self._gen_response_via_header(request)
refer += "Max-Forwards: 70\r\n"

method = "sips" if self.transport_mode is TransportMode.TLS else "sip"
refer += self.__gen_from_to(
"From",
self.user,
self.nat.get_host(self.bind_ip),
method,
header_parms=f";tag={tag}",
)

# Determine if To or From is local to decide who the refer is to
to_local = (
self.nat.check_host(request.headers["To"]["host"])
== AddressType.LOCAL
)

if to_local:
to_user = request.headers["From"]["user"]
to_host = request.headers["From"]["host"]
method = request.headers["From"]["uri-type"]
to_display_name = request.headers["From"]["display-name"]
to_password = request.headers["From"]["password"]
to_port = request.headers["From"]["port"]
remote_tag = request.headers["From"]["tag"]
else:
to_user = request.headers["To"]["user"]
to_host = request.headers["To"]["host"]
method = request.headers["To"]["uri-type"]
to_display_name = request.headers["To"]["display-name"]
to_password = request.headers["To"]["password"]
to_port = request.headers["To"]["port"]
remote_tag = request.headers["To"]["tag"]

refer += self.__gen_from_to(
"To",
to_user,
to_host,
method,
to_display_name,
to_password,
to_port,
)

refer += f"Call-ID: {call_id}\r\n"
refer += f"CSeq: {self.referCounter.next()} REFER\r\n"
refer += f"Allow: {(', '.join(pyVoIP.SIPCompatibleMethods))}\r\n"
if user:
method = (
"sips" if self.transport_mode is TransportMode.TLS else "sip"
)
uri = self.__gen_uri(
method,
user,
self.nat.get_host(self.server),
port=self.bind_port,
)
refer += f"Refer-To: {uri}\r\n"
sess_call_id = request.headers["Call-ID"]
local_tag = self.tagLibrary[sess_call_id]
refer += (
f"Target-Dialog: {sess_call_id};local-tag={local_tag}"
+ f";remote-tag={remote_tag}\r\n"
)
refer += self.__gen_contact(
method,
self.user,
self.nat.get_host(self.server),
port=self.bind_port,
)
if blind:
refer += "Refer-Sub: false\r\n"
refer += "Supported: norefersub\r\n"
refer += self.__gen_user_agent()
refer += "Content-Length: 0\r\n\r\n"
return refer

def __gen_refer_same_dialog(
self,
request: SIPMessage,
user: Optional[str] = None,
uri: Optional[str] = None,
blind=True,
) -> str:
tag = self.tagLibrary[request.headers["Call-ID"]]
c = request.headers["Contact"]["uri"]
refer = f"REFER {c} SIP/2.0\r\n"
refer += self._gen_response_via_header(request)
refer += "Max-Forwards: 70\r\n"
_from = request.headers["From"]
to = request.headers["To"]
if request.headers["From"]["tag"] == tag:
refer += self.__gen_from_to_via_request(request, "From", tag)
refer += f"To: {to['raw']}\r\n"
else:
refer += f"To: {_from['raw']}\r\n"
refer += self.__gen_from_to_via_request(
request, "To", tag, dsthdr="From"
)
refer += f"Call-ID: {request.headers['Call-ID']}\r\n"
refer += f"CSeq: {self.referCounter.next()} REFER\r\n"
refer += f"Allow: {(', '.join(pyVoIP.SIPCompatibleMethods))}\r\n"
method = "sips" if self.transport_mode is TransportMode.TLS else "sip"
if user:
uri = self.__gen_uri(
method,
user,
self.nat.get_host(self.server),
port=self.bind_port,
)
refer += f"Refer-To: {uri}\r\n"
refer += self.__gen_contact(
method,
self.user,
self.nat.get_host(self.server),
port=self.bind_port,
)
if blind:
refer += "Refer-Sub: false\r\n"
refer += "Supported: norefersub\r\n"
refer += self.__gen_user_agent()
refer += "Content-Length: 0\r\n\r\n"
return refer

def _gen_bye_cancel(self, request: SIPMessage, cmd: str) -> str:
tag = self.tagLibrary[request.headers["Call-ID"]]
c = request.headers["Contact"]["uri"]
Expand Down
46 changes: 38 additions & 8 deletions pyVoIP/SIP/message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum, IntEnum
from pyVoIP import regex
from pyVoIP.SIP.error import SIPParseError
from pyVoIP.types import TFC_HEADER
from pyVoIP.types import URI_HEADER
from typing import Any, Callable, Dict, List, Optional, Union
import pyVoIP

Expand Down Expand Up @@ -291,7 +291,6 @@ def __init__(self, data: bytes):
self.SIPCompatibleMethods = pyVoIP.SIPCompatibleMethods
self.heading: List[str] = []
self.type: Optional[SIPMessageType] = None
self.to: Optional[TFC_HEADER] = None
self.status = SIPStatus(491)
self.headers: Dict[str, Any] = {"Via": []}
self.body: Dict[str, Any] = {}
Expand All @@ -317,9 +316,20 @@ def __init__(self, data: bytes):
self.parse(data)
except Exception as e:
if type(e) is not SIPParseError:
raise SIPParseError(e)
raise SIPParseError(e) from e
raise

@property
def to(self) -> Optional[URI_HEADER]:
"""
The to property specifies the URI in the first line of a SIP request.
"""
return self._to

@to.setter
def to(self, value: str) -> None:
self._to = value

def summary(self) -> str:
data = ""
data += f"{' '.join(self.heading)}\n\n"
Expand Down Expand Up @@ -362,7 +372,7 @@ def parse(self, data: bytes) -> None:
)
"""

def __get_tfc_header(self, data: str) -> TFC_HEADER:
def __get_uri_header(self, data: str) -> URI_HEADER:
info = data.split(";tag=")
tag = ""
if len(info) >= 2:
Expand Down Expand Up @@ -448,14 +458,14 @@ def parse_header(self, header: str, data: str) -> None:
else:
_via[x] = None
self.headers["Via"].append(_via)
elif header in ["To", "From", "Contact"]:
self.headers[header] = self.__get_tfc_header(data)
elif header in ["To", "From", "Contact", "Refer-To"]:
self.headers[header] = self.__get_uri_header(data)
elif header == "CSeq":
self.headers[header] = {
"check": int(data.split(" ")[0]),
"method": data.split(" ")[1],
}
elif header == "Allow" or header == "Supported":
elif header in ["Allow", "Supported", "Require"]:
self.headers[header] = data.split(", ")
elif header == "Call-ID":
self.headers[header] = data
Expand Down Expand Up @@ -483,6 +493,26 @@ def parse_header(self, header: str, data: str) -> None:
header_data[var] = data.strip('"')
self.headers[header] = header_data
self.authentication = header_data
elif header == "Target-Dialog":
# Target-Dialog (tdialog) is specified in RFC 4538
params = data.split(";")
header_data: Dict[str, Any] = {
"callid": params.pop(0)
} # key is callid to be consitenent with RFC 4538 Section 7
for x in params:
y = x.split("=")
header_data[y[0]] = y[1]
self.headers[header] = header_data
elif header == "Refer-Sub":
# Refer-Sub (norefersub) is specified in RFC 4488
params = data.split(";")
header_data: Dict[str, Any] = {
"value": True if params.pop(0) == "true" else False
} # BNF states extens are possible
for x in params:
y = x.split("=")
header_data[y[0]] = y[1]
self.headers[header] = header_data
else:
try:
self.headers[header] = int(data)
Expand Down Expand Up @@ -790,7 +820,7 @@ def parse_sip_request(self, data: bytes) -> None:
raise SIPParseError(f"SIP Version {self.version} not compatible.")

self.method = self.heading[0]
self.to = self.__get_tfc_header(self.heading[1])
self.to = self.__get_uri_header(self.heading[1])

self.parse_raw_header(headers_raw, self.parse_header)

Expand Down
Loading

0 comments on commit 2dd3c1b

Please sign in to comment.