Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ConnectOptions dataclass #610

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import ssl
import time
import string
from dataclasses import dataclass
from dataclasses import dataclass, field
from email.parser import BytesParser
from random import shuffle
from secrets import token_hex
Expand Down Expand Up @@ -195,6 +195,43 @@ async def _default_error_callback(ex: Exception) -> None:
"""
_logger.error('nats: encountered error', exc_info=ex)

@dataclass
class ConnectOptions:
servers: Union[str, List[str]] = field(default_factory=lambda: ["nats://localhost:4222"])
error_cb: Optional[ErrorCallback] = None
disconnected_cb: Optional[Callback] = None
closed_cb: Optional[Callback] = None
discovered_server_cb: Optional[Callback] = None
reconnected_cb: Optional[Callback] = None
name: Optional[str] = None
pedantic: bool = False
verbose: bool = False
allow_reconnect: bool = True
connect_timeout: int = DEFAULT_CONNECT_TIMEOUT
reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT
max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS
ping_interval: int = DEFAULT_PING_INTERVAL
max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS
dont_randomize: bool = False
flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE
no_echo: bool = False
tls: Optional[ssl.SSLContext] = None
tls_hostname: Optional[str] = None
tls_handshake_first: bool = False
user: Optional[str] = None
password: Optional[str] = None
token: Optional[str] = None
drain_timeout: int = DEFAULT_DRAIN_TIMEOUT
signature_cb: Optional[SignatureCallback] = None
user_jwt_cb: Optional[JWTCallback] = None
user_credentials: Optional[Credentials] = None
nkeys_seed: Optional[str] = None
nkeys_seed_str: Optional[str] = None
inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX
pending_size: int = DEFAULT_PENDING_SIZE
flush_timeout: Optional[float] = None



class Client:
"""
Expand Down Expand Up @@ -330,6 +367,7 @@ async def connect(
inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX,
pending_size: int = DEFAULT_PENDING_SIZE,
flush_timeout: Optional[float] = None,
config: Optional[ConnectOptions] = None
) -> None:
"""
Establishes a connection to NATS.
Expand Down Expand Up @@ -419,6 +457,41 @@ async def subscribe_handler(msg):
asyncio.run(main())

"""

if config:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bigger refactor but maybe merge into the config rather than out of it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored in e668f54

servers = config.servers
error_cb = config.error_cb
disconnected_cb = config.disconnected_cb
closed_cb = config.closed_cb
discovered_server_cb = config.discovered_server_cb
reconnected_cb = config.reconnected_cb
name = config.name
pedantic = config.pedantic
verbose = config.verbose
allow_reconnect = config.allow_reconnect
connect_timeout = config.connect_timeout
reconnect_time_wait = config.reconnect_time_wait
max_reconnect_attempts = config.max_reconnect_attempts
ping_interval = config.ping_interval
max_outstanding_pings = config.max_outstanding_pings
dont_randomize = config.dont_randomize
flusher_queue_size = config.flusher_queue_size
no_echo = config.no_echo
tls = config.tls
tls_hostname = config.tls_hostname
tls_handshake_first = config.tls_handshake_first
user = config.user
password = config.password
token = config.token
drain_timeout = config.drain_timeout
signature_cb = config.signature_cb
user_jwt_cb = config.user_jwt_cb
user_credentials = config.user_credentials
nkeys_seed = config.nkeys_seed
nkeys_seed_str = config.nkeys_seed_str
inbox_prefix = config.inbox_prefix
pending_size = config.pending_size
flush_timeout = config.flush_timeout

for cb in [error_cb, disconnected_cb, closed_cb, reconnected_cb,
discovered_server_cb]:
Expand Down Expand Up @@ -1261,7 +1334,7 @@ async def _flush_pending(
except asyncio.CancelledError:
pass

def _setup_server_pool(self, connect_url: Union[List[str]]) -> None:
def _setup_server_pool(self, connect_url: Union[str, List[str]]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this throws a typing error -- Union[List[str]] is just List[str], and _setup_server_pool accepts both str and List[str] (via isinstance(...).

It is not exactly related, but is in the connect call and should be fixed... do you think it belongs in a separate PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, just checking. I'm doing a lint fix PR so that should take care of it. Can just leave it in for now.

if isinstance(connect_url, str):
try:
if "nats://" in connect_url or "tls://" in connect_url:
Expand Down