From fc34f9c323d51839edd88a44a223cccb621eb0cd Mon Sep 17 00:00:00 2001 From: Thomas Uram Date: Mon, 6 Feb 2023 16:59:35 -0600 Subject: [PATCH 1/3] Prep for prerelease; lint fixes --- balsam/__init__.py | 2 +- balsam/_api/models.py | 2 +- balsam/site/launcher/_serial_mode_master.py | 18 +++++++++--------- balsam/site/launcher/_serial_mode_worker.py | 14 +++++++------- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/balsam/__init__.py b/balsam/__init__.py index c5aec0d9..47b27195 100644 --- a/balsam/__init__.py +++ b/balsam/__init__.py @@ -1,4 +1,4 @@ from balsam.util import config_root_logger -__version__ = "0.7.0.a20" +__version__ = "0.7.0.a21" config_root_logger() diff --git a/balsam/_api/models.py b/balsam/_api/models.py index 6a71d3e7..8e23bbca 100644 --- a/balsam/_api/models.py +++ b/balsam/_api/models.py @@ -1,5 +1,5 @@ # This file was auto-generated via /Users/turam/opt/miniconda3/bin/python balsam/schemas/api_generator.py -# [git rev 3fcc4a5] +# [git rev 1e2180e] # Do *not* make changes to the API by changing this file! import datetime diff --git a/balsam/site/launcher/_serial_mode_master.py b/balsam/site/launcher/_serial_mode_master.py index 1cc08874..fa7b84d8 100644 --- a/balsam/site/launcher/_serial_mode_master.py +++ b/balsam/site/launcher/_serial_mode_master.py @@ -117,9 +117,9 @@ def acquire_jobs(self, max_jobs: int) -> List[Dict[str, Any]]: def handle_request(self) -> None: msg = self.socket.recv_json() - done_ids: List[int] = msg["done"] # type: ignore - error_logs: List[Tuple[int, int, str]] = msg["error"] # type: ignore - started_ids: List[int] = msg["started"] # type: ignore + done_ids: List[int] = msg["done"] + error_logs: List[Tuple[int, int, str]] = msg["error"] + started_ids: List[int] = msg["started"] self.update_job_states(done_ids, error_logs, started_ids) finished_ids = set(done_ids) | set(log[0] for log in error_logs) @@ -127,8 +127,8 @@ def handle_request(self) -> None: self.active_ids -= finished_ids self.num_outstanding_jobs -= len(finished_ids) - src = msg["source"] # type: ignore - max_jobs: int = msg["request_num_jobs"] # type: ignore + src = msg["source"] + max_jobs: int = msg["request_num_jobs"] logger.debug(f"Worker {src} requested {max_jobs} jobs") new_job_specs = self.acquire_jobs(max_jobs) @@ -154,9 +154,9 @@ def idle_check(self) -> None: def run(self) -> None: logger.debug("In master run") try: - self.context = zmq.Context() - self.context.setsockopt(zmq.LINGER, 0) - self.socket = self.context.socket(zmq.REP) + self.context = zmq.Context() # type: ignore + self.context.setsockopt(zmq.LINGER, 0) # type: ignore + self.socket = self.context.socket(zmq.REP) # type: ignore self.socket.bind(f"tcp://*:{self.master_port}") logger.debug("Master ZMQ socket bound.") @@ -173,7 +173,7 @@ def run(self) -> None: self.shutdown() self.socket.setsockopt(zmq.LINGER, 0) self.socket.close(linger=0) - self.context.term() + self.context.term() # type: ignore logger.info("shutdown done: ensemble master exit gracefully") def shutdown(self) -> None: diff --git a/balsam/site/launcher/_serial_mode_worker.py b/balsam/site/launcher/_serial_mode_worker.py index b861d63a..d8b8861d 100644 --- a/balsam/site/launcher/_serial_mode_worker.py +++ b/balsam/site/launcher/_serial_mode_worker.py @@ -150,7 +150,7 @@ def exit(self) -> None: self.cleanup_proc(id, timeout=self.CHECK_PERIOD) self.socket.setsockopt(zmq.LINGER, 0) self.socket.close(linger=0) - self.context.term() + self.context.term() # type: ignore def start_jobs(self) -> List[int]: started_ids = [] @@ -193,12 +193,12 @@ def cycle(self) -> bool: response_msg = self.socket.recv_json() logger.debug("Worker response received") - if response_msg.get("exit"): # type: ignore + if response_msg.get("exit"): logger.info(f"Worker {self.hostname} received exit message: break") return False - if response_msg.get("new_jobs"): # type: ignore - self.runnable_cache.update({job["id"]: job for job in response_msg["new_jobs"]}) # type: ignore + if response_msg.get("new_jobs"): + self.runnable_cache.update({job["id"]: job for job in response_msg["new_jobs"]}) logger.debug( f"{self.hostname} fraction available: {self.node_manager.aggregate_free_nodes()} " @@ -208,9 +208,9 @@ def cycle(self) -> bool: return True def run(self) -> None: - self.context = zmq.Context() - self.context.setsockopt(zmq.LINGER, 0) - self.socket = self.context.socket(zmq.REQ) + self.context = zmq.Context() # type: ignore + self.context.setsockopt(zmq.LINGER, 0) # type: ignore + self.socket = self.context.socket(zmq.REQ) # type: ignore self.socket.connect(self.master_address) logger.debug(f"Worker connected to {self.master_address}") From ce4bdcee95896c64d303cebed3fcc9a2c1cdd598 Mon Sep 17 00:00:00 2001 From: Thomas Uram Date: Mon, 6 Feb 2023 17:05:47 -0600 Subject: [PATCH 2/3] Lint fixes --- balsam/_api/manager.py | 2 -- balsam/_api/models.py | 2 +- balsam/client/requests_client.py | 1 - balsam/cmdline/_launcher.py | 2 +- balsam/platform/compute_node/alcf_cooley_node.py | 1 - balsam/platform/compute_node/alcf_polaris_node.py | 1 - balsam/platform/compute_node/alcf_sunspot_node.py | 1 - balsam/platform/compute_node/alcf_thetagpu_node.py | 1 - balsam/platform/compute_node/alcf_thetaknl_node.py | 1 - balsam/platform/compute_node/compute_node.py | 1 - balsam/platform/compute_node/default.py | 1 - balsam/platform/compute_node/nersc_corihas_node.py | 1 - balsam/platform/compute_node/nersc_coriknl_node.py | 1 - balsam/platform/compute_node/nersc_perlmutter_gpu.py | 1 - balsam/platform/compute_node/summit_node.py | 1 - balsam/platform/tests/test_mpirun.py | 4 ---- balsam/platform/tests/test_scheduler.py | 4 ---- balsam/server/models/crud/sessions.py | 1 - balsam/site/launcher/_serial_mode_master.py | 2 +- tests/benchmark/locustfile.py | 2 +- 20 files changed, 4 insertions(+), 27 deletions(-) diff --git a/balsam/_api/manager.py b/balsam/_api/manager.py index 6a202654..2a5e2941 100644 --- a/balsam/_api/manager.py +++ b/balsam/_api/manager.py @@ -161,7 +161,6 @@ def _unpack_list_response(self, response_data: Dict[str, Any]) -> Tuple[int, Lis def _fetch_pages( self, filters: Dict[str, Any], ordering: Optional[str], limit: Optional[int], offset: Optional[int] ) -> Tuple[int, List[Dict[str, Any]]]: - base_offset = 0 if offset is None else offset page_size = MAX_PAGE_SIZE if limit is None else min(limit, MAX_PAGE_SIZE) @@ -191,7 +190,6 @@ def _get_list( limit: Optional[int], offset: Optional[int], ) -> Tuple[List[T], int]: - filter_chunks = self._chunk_filters(filters) full_count: int = 0 full_results: List[Dict[str, Any]] = [] diff --git a/balsam/_api/models.py b/balsam/_api/models.py index 8e23bbca..df8cf6a2 100644 --- a/balsam/_api/models.py +++ b/balsam/_api/models.py @@ -1,5 +1,5 @@ # This file was auto-generated via /Users/turam/opt/miniconda3/bin/python balsam/schemas/api_generator.py -# [git rev 1e2180e] +# [git rev fc34f9c] # Do *not* make changes to the API by changing this file! import datetime diff --git a/balsam/client/requests_client.py b/balsam/client/requests_client.py index 723a653b..ccec6656 100644 --- a/balsam/client/requests_client.py +++ b/balsam/client/requests_client.py @@ -21,7 +21,6 @@ class NotAuthenticatedError(Exception): class RequestsClient(RESTClient): - _client_classes: "Dict[str, Type[RequestsClient]]" = {} @staticmethod diff --git a/balsam/cmdline/_launcher.py b/balsam/cmdline/_launcher.py index ddbc88d7..13aeb67c 100644 --- a/balsam/cmdline/_launcher.py +++ b/balsam/cmdline/_launcher.py @@ -157,7 +157,7 @@ def launcher( for proc in launcher_procs: try: proc.wait(timeout=1) - except (subprocess.TimeoutExpired): + except subprocess.TimeoutExpired: pass else: done_procs.append(proc) diff --git a/balsam/platform/compute_node/alcf_cooley_node.py b/balsam/platform/compute_node/alcf_cooley_node.py index 7e71f7ad..393f07f2 100644 --- a/balsam/platform/compute_node/alcf_cooley_node.py +++ b/balsam/platform/compute_node/alcf_cooley_node.py @@ -5,7 +5,6 @@ class CooleyNode(ComputeNode): - cpu_ids = list(range(12)) gpu_ids = list(range(2)) diff --git a/balsam/platform/compute_node/alcf_polaris_node.py b/balsam/platform/compute_node/alcf_polaris_node.py index 1edef7ba..b5283c3b 100644 --- a/balsam/platform/compute_node/alcf_polaris_node.py +++ b/balsam/platform/compute_node/alcf_polaris_node.py @@ -10,7 +10,6 @@ class PolarisNode(ComputeNode): - # turam: confirm number of cpus cpu_ids = list(range(64)) gpu_ids: List[IntStr] = list(range(4)) diff --git a/balsam/platform/compute_node/alcf_sunspot_node.py b/balsam/platform/compute_node/alcf_sunspot_node.py index 511ec06f..715f4e42 100644 --- a/balsam/platform/compute_node/alcf_sunspot_node.py +++ b/balsam/platform/compute_node/alcf_sunspot_node.py @@ -9,7 +9,6 @@ class SunspotNode(ComputeNode): - cpu_ids = list(range(104)) gpu_ids: List[IntStr] diff --git a/balsam/platform/compute_node/alcf_thetagpu_node.py b/balsam/platform/compute_node/alcf_thetagpu_node.py index 0ceb439d..112fc81c 100644 --- a/balsam/platform/compute_node/alcf_thetagpu_node.py +++ b/balsam/platform/compute_node/alcf_thetagpu_node.py @@ -10,7 +10,6 @@ class ThetaGPUNode(ComputeNode): - cpu_ids = list(range(128)) gpu_ids: List[IntStr] = list(range(8)) diff --git a/balsam/platform/compute_node/alcf_thetaknl_node.py b/balsam/platform/compute_node/alcf_thetaknl_node.py index ea6894c0..c17057f0 100644 --- a/balsam/platform/compute_node/alcf_thetaknl_node.py +++ b/balsam/platform/compute_node/alcf_thetaknl_node.py @@ -5,7 +5,6 @@ class ThetaKNLNode(ComputeNode): - cpu_ids = list(range(64)) gpu_ids: List[Union[int, str]] = [] diff --git a/balsam/platform/compute_node/compute_node.py b/balsam/platform/compute_node/compute_node.py index 6cf5d03f..a18b562c 100644 --- a/balsam/platform/compute_node/compute_node.py +++ b/balsam/platform/compute_node/compute_node.py @@ -6,7 +6,6 @@ class ComputeNode: - cpu_ids: List[IntStr] = [] gpu_ids: List[IntStr] = [] diff --git a/balsam/platform/compute_node/default.py b/balsam/platform/compute_node/default.py index 293b9f0f..0d91cbf8 100644 --- a/balsam/platform/compute_node/default.py +++ b/balsam/platform/compute_node/default.py @@ -10,7 +10,6 @@ class DefaultNode(ComputeNode): - cpu_ids = list(range(psutil.cpu_count() or 4)) gpu_ids: List[Union[int, str]] = [] diff --git a/balsam/platform/compute_node/nersc_corihas_node.py b/balsam/platform/compute_node/nersc_corihas_node.py index c8d41ab3..4b92469d 100644 --- a/balsam/platform/compute_node/nersc_corihas_node.py +++ b/balsam/platform/compute_node/nersc_corihas_node.py @@ -5,7 +5,6 @@ class CoriHaswellNode(ComputeNode): - cpu_ids = list(range(32)) gpu_ids: List[Union[int, str]] = [] diff --git a/balsam/platform/compute_node/nersc_coriknl_node.py b/balsam/platform/compute_node/nersc_coriknl_node.py index 617203f7..0e500e77 100644 --- a/balsam/platform/compute_node/nersc_coriknl_node.py +++ b/balsam/platform/compute_node/nersc_coriknl_node.py @@ -7,7 +7,6 @@ class CoriKNLNode(ComputeNode): - cpu_ids: List[IntStr] = list(range(68)) gpu_ids: List[IntStr] = [] diff --git a/balsam/platform/compute_node/nersc_perlmutter_gpu.py b/balsam/platform/compute_node/nersc_perlmutter_gpu.py index c15284ef..74fda925 100644 --- a/balsam/platform/compute_node/nersc_perlmutter_gpu.py +++ b/balsam/platform/compute_node/nersc_perlmutter_gpu.py @@ -5,7 +5,6 @@ class PerlmutterGPUNode(ComputeNode): - cpu_ids = list(range(64)) gpu_ids: List[Union[int, str]] = list(range(4)) diff --git a/balsam/platform/compute_node/summit_node.py b/balsam/platform/compute_node/summit_node.py index e5ac15e5..b079c7a6 100644 --- a/balsam/platform/compute_node/summit_node.py +++ b/balsam/platform/compute_node/summit_node.py @@ -5,7 +5,6 @@ class SummitNode(ComputeNode): - cpu_ids = list(range(42)) gpu_ids = list(range(6)) diff --git a/balsam/platform/tests/test_mpirun.py b/balsam/platform/tests/test_mpirun.py index 7ac1db9d..1c021c7a 100644 --- a/balsam/platform/tests/test_mpirun.py +++ b/balsam/platform/tests/test_mpirun.py @@ -110,7 +110,6 @@ def setUp(self): @staticmethod def parse_output(output_fn): - ranks = [] sizes = [] with open(output_fn) as file: @@ -164,7 +163,6 @@ def setUp(self): @staticmethod def parse_output(output_fn): - ranks = [] sizes = [] with open(output_fn) as file: @@ -218,7 +216,6 @@ def setUp(self): @staticmethod def parse_output(output_fn): - ranks = [] sizes = [] with open(output_fn) as file: @@ -280,7 +277,6 @@ def setUp(self): @staticmethod def parse_output(output_fn): - ranks = [] sizes = [] with open(output_fn) as file: diff --git a/balsam/platform/tests/test_scheduler.py b/balsam/platform/tests/test_scheduler.py index 8acf2b8f..7523e9a6 100644 --- a/balsam/platform/tests/test_scheduler.py +++ b/balsam/platform/tests/test_scheduler.py @@ -14,7 +14,6 @@ def assertInPath(self, exe): self.assertTrue(which_exe is not None, f"'{exe}' not in PATH") def test_submit(self): - # verify script exists self.assertTrue(os.path.exists(self.script_path)) # verify submit command is in path @@ -127,7 +126,6 @@ def tearDown(self): class CobaltTest(SchedulerTestMixin, unittest.TestCase): - submit_script = """!#/usr/bin/env bash echo [$SECONDS] Running test submit script echo [$SECONDS] COBALT_JOBID = $COBALT_JOBID @@ -171,7 +169,6 @@ def tearDown(self): class SlurmTest(SchedulerTestMixin, unittest.TestCase): - submit_script = """#!/usr/bin/env bash -l echo [$SECONDS] Running test submit script echo [$SECONDS] SLURM_JOB_ID = SLURM_JOB_ID @@ -221,7 +218,6 @@ def test_get_backfill_windows(self): class LsfTest(SchedulerTestMixin, unittest.TestCase): - submit_script = """#!/usr/bin/env bash echo [$SECONDS] Running test submit script echo [$SECONDS] LSB_JOBID = $LSB_JOBID diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index e6e175b5..ff40985a 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -149,7 +149,6 @@ def _footprint_func() -> Any: def acquire( db: Session, owner: schemas.UserOut, session_id: int, spec: schemas.SessionAcquire ) -> List[Dict[str, Any]]: - session = (owned_session_query(db, owner).filter(models.Session.id == session_id)).one() session.heartbeat = datetime.utcnow() diff --git a/balsam/site/launcher/_serial_mode_master.py b/balsam/site/launcher/_serial_mode_master.py index fa7b84d8..ec7a90f5 100644 --- a/balsam/site/launcher/_serial_mode_master.py +++ b/balsam/site/launcher/_serial_mode_master.py @@ -82,7 +82,7 @@ def update_job_states( for id in done_ids: self.status_updater.put(id, JobState.run_done, state_timestamp=now) - for (id, retcode, tail) in error_logs: + for id, retcode, tail in error_logs: self.status_updater.put( id, JobState.run_error, state_timestamp=now, state_data={"returncode": retcode, "error": tail} ) diff --git a/tests/benchmark/locustfile.py b/tests/benchmark/locustfile.py index 17118e27..418dc236 100644 --- a/tests/benchmark/locustfile.py +++ b/tests/benchmark/locustfile.py @@ -28,6 +28,7 @@ ) TEST_BALSAM_SERVER = os.environ.get("BALSAM_TESET_SERVER", "http://0.0.0.0:8000") + # overload client using username/password token authentication class LocustBalsamClientA(BasicAuthRequestsClient): def __init__(self, api_root: str, request_event: EventHook) -> None: @@ -236,7 +237,6 @@ def bulk_job_submission(self) -> None: # simulate runnings jobs in batches steps = int(len(jobs) / simulated_nodes) + 1 for step in range(steps): - # indices of jobs to operate on start = simulated_nodes * step end = min(simulated_nodes * (step + 1), len(jobs)) From 4ea2b3a965a91912d67a7d9ca553c29c2461d7c9 Mon Sep 17 00:00:00 2001 From: Thomas Uram Date: Mon, 6 Feb 2023 17:24:39 -0600 Subject: [PATCH 3/3] Lint fixes --- balsam/_api/models.py | 8 ++++---- balsam/site/launcher/_serial_mode_master.py | 18 +++++++++--------- balsam/site/launcher/_serial_mode_worker.py | 14 +++++++------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/balsam/_api/models.py b/balsam/_api/models.py index df8cf6a2..0c3fcdaa 100644 --- a/balsam/_api/models.py +++ b/balsam/_api/models.py @@ -1,5 +1,5 @@ # This file was auto-generated via /Users/turam/opt/miniconda3/bin/python balsam/schemas/api_generator.py -# [git rev fc34f9c] +# [git rev ce4bdce] # Do *not* make changes to the API by changing this file! import datetime @@ -765,7 +765,7 @@ class BatchJob(balsam._api.bases.BatchJobBase): job_mode = Field[balsam.schemas.batchjob.JobMode]() optional_params = Field[typing.Dict[str, str]]() filter_tags = Field[typing.Dict[str, str]]() - partitions = Field[Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]]]() + partitions = Field[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]]() site_id = Field[int]() project = Field[str]() queue = Field[str]() @@ -786,7 +786,7 @@ def __init__( queue: str, optional_params: Optional[typing.Dict[str, str]] = None, filter_tags: Optional[typing.Dict[str, str]] = None, - partitions: Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]] = None, + partitions: Optional[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]] = None, **kwargs: Any, ) -> None: """ @@ -918,7 +918,7 @@ def create( queue: str, optional_params: Optional[typing.Dict[str, str]] = None, filter_tags: Optional[typing.Dict[str, str]] = None, - partitions: Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]] = None, + partitions: Optional[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]] = None, ) -> BatchJob: """ Create a new BatchJob object and save it to the API in one step. diff --git a/balsam/site/launcher/_serial_mode_master.py b/balsam/site/launcher/_serial_mode_master.py index ec7a90f5..558fdc9b 100644 --- a/balsam/site/launcher/_serial_mode_master.py +++ b/balsam/site/launcher/_serial_mode_master.py @@ -117,9 +117,9 @@ def acquire_jobs(self, max_jobs: int) -> List[Dict[str, Any]]: def handle_request(self) -> None: msg = self.socket.recv_json() - done_ids: List[int] = msg["done"] - error_logs: List[Tuple[int, int, str]] = msg["error"] - started_ids: List[int] = msg["started"] + done_ids: List[int] = msg["done"] # type: ignore + error_logs: List[Tuple[int, int, str]] = msg["error"] # type: ignore + started_ids: List[int] = msg["started"] # type: ignore self.update_job_states(done_ids, error_logs, started_ids) finished_ids = set(done_ids) | set(log[0] for log in error_logs) @@ -127,8 +127,8 @@ def handle_request(self) -> None: self.active_ids -= finished_ids self.num_outstanding_jobs -= len(finished_ids) - src = msg["source"] - max_jobs: int = msg["request_num_jobs"] + src = msg["source"] # type: ignore + max_jobs: int = msg["request_num_jobs"] # type: ignore logger.debug(f"Worker {src} requested {max_jobs} jobs") new_job_specs = self.acquire_jobs(max_jobs) @@ -154,9 +154,9 @@ def idle_check(self) -> None: def run(self) -> None: logger.debug("In master run") try: - self.context = zmq.Context() # type: ignore - self.context.setsockopt(zmq.LINGER, 0) # type: ignore - self.socket = self.context.socket(zmq.REP) # type: ignore + self.context = zmq.Context() + self.context.setsockopt(zmq.LINGER, 0) + self.socket = self.context.socket(zmq.REP) self.socket.bind(f"tcp://*:{self.master_port}") logger.debug("Master ZMQ socket bound.") @@ -173,7 +173,7 @@ def run(self) -> None: self.shutdown() self.socket.setsockopt(zmq.LINGER, 0) self.socket.close(linger=0) - self.context.term() # type: ignore + self.context.term() logger.info("shutdown done: ensemble master exit gracefully") def shutdown(self) -> None: diff --git a/balsam/site/launcher/_serial_mode_worker.py b/balsam/site/launcher/_serial_mode_worker.py index d8b8861d..b861d63a 100644 --- a/balsam/site/launcher/_serial_mode_worker.py +++ b/balsam/site/launcher/_serial_mode_worker.py @@ -150,7 +150,7 @@ def exit(self) -> None: self.cleanup_proc(id, timeout=self.CHECK_PERIOD) self.socket.setsockopt(zmq.LINGER, 0) self.socket.close(linger=0) - self.context.term() # type: ignore + self.context.term() def start_jobs(self) -> List[int]: started_ids = [] @@ -193,12 +193,12 @@ def cycle(self) -> bool: response_msg = self.socket.recv_json() logger.debug("Worker response received") - if response_msg.get("exit"): + if response_msg.get("exit"): # type: ignore logger.info(f"Worker {self.hostname} received exit message: break") return False - if response_msg.get("new_jobs"): - self.runnable_cache.update({job["id"]: job for job in response_msg["new_jobs"]}) + if response_msg.get("new_jobs"): # type: ignore + self.runnable_cache.update({job["id"]: job for job in response_msg["new_jobs"]}) # type: ignore logger.debug( f"{self.hostname} fraction available: {self.node_manager.aggregate_free_nodes()} " @@ -208,9 +208,9 @@ def cycle(self) -> bool: return True def run(self) -> None: - self.context = zmq.Context() # type: ignore - self.context.setsockopt(zmq.LINGER, 0) # type: ignore - self.socket = self.context.socket(zmq.REQ) # type: ignore + self.context = zmq.Context() + self.context.setsockopt(zmq.LINGER, 0) + self.socket = self.context.socket(zmq.REQ) self.socket.connect(self.master_address) logger.debug(f"Worker connected to {self.master_address}")