Skip to content

Commit

Permalink
Lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Uram committed Feb 6, 2023
1 parent ce4bdce commit 4ea2b3a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
8 changes: 4 additions & 4 deletions balsam/_api/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file was auto-generated via /Users/turam/opt/miniconda3/bin/python balsam/schemas/api_generator.py
# [git rev fc34f9c]
# [git rev ce4bdce]
# Do *not* make changes to the API by changing this file!

import datetime
Expand Down Expand Up @@ -765,7 +765,7 @@ class BatchJob(balsam._api.bases.BatchJobBase):
job_mode = Field[balsam.schemas.batchjob.JobMode]()
optional_params = Field[typing.Dict[str, str]]()
filter_tags = Field[typing.Dict[str, str]]()
partitions = Field[Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]]]()
partitions = Field[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]]()
site_id = Field[int]()
project = Field[str]()
queue = Field[str]()
Expand All @@ -786,7 +786,7 @@ def __init__(
queue: str,
optional_params: Optional[typing.Dict[str, str]] = None,
filter_tags: Optional[typing.Dict[str, str]] = None,
partitions: Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]] = None,
partitions: Optional[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]] = None,
**kwargs: Any,
) -> None:
"""
Expand Down Expand Up @@ -918,7 +918,7 @@ def create(
queue: str,
optional_params: Optional[typing.Dict[str, str]] = None,
filter_tags: Optional[typing.Dict[str, str]] = None,
partitions: Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]] = None,
partitions: Optional[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]] = None,
) -> BatchJob:
"""
Create a new BatchJob object and save it to the API in one step.
Expand Down
18 changes: 9 additions & 9 deletions balsam/site/launcher/_serial_mode_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,18 @@ def acquire_jobs(self, max_jobs: int) -> List[Dict[str, Any]]:
def handle_request(self) -> None:
msg = self.socket.recv_json()

done_ids: List[int] = msg["done"]
error_logs: List[Tuple[int, int, str]] = msg["error"]
started_ids: List[int] = msg["started"]
done_ids: List[int] = msg["done"] # type: ignore
error_logs: List[Tuple[int, int, str]] = msg["error"] # type: ignore
started_ids: List[int] = msg["started"] # type: ignore
self.update_job_states(done_ids, error_logs, started_ids)

finished_ids = set(done_ids) | set(log[0] for log in error_logs)
self.active_ids |= set(started_ids)
self.active_ids -= finished_ids
self.num_outstanding_jobs -= len(finished_ids)

src = msg["source"]
max_jobs: int = msg["request_num_jobs"]
src = msg["source"] # type: ignore
max_jobs: int = msg["request_num_jobs"] # type: ignore
logger.debug(f"Worker {src} requested {max_jobs} jobs")
new_job_specs = self.acquire_jobs(max_jobs)

Expand All @@ -154,9 +154,9 @@ def idle_check(self) -> None:
def run(self) -> None:
logger.debug("In master run")
try:
self.context = zmq.Context() # type: ignore
self.context.setsockopt(zmq.LINGER, 0) # type: ignore
self.socket = self.context.socket(zmq.REP) # type: ignore
self.context = zmq.Context()
self.context.setsockopt(zmq.LINGER, 0)
self.socket = self.context.socket(zmq.REP)
self.socket.bind(f"tcp://*:{self.master_port}")
logger.debug("Master ZMQ socket bound.")

Expand All @@ -173,7 +173,7 @@ def run(self) -> None:
self.shutdown()
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close(linger=0)
self.context.term() # type: ignore
self.context.term()
logger.info("shutdown done: ensemble master exit gracefully")

def shutdown(self) -> None:
Expand Down
14 changes: 7 additions & 7 deletions balsam/site/launcher/_serial_mode_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def exit(self) -> None:
self.cleanup_proc(id, timeout=self.CHECK_PERIOD)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close(linger=0)
self.context.term() # type: ignore
self.context.term()

def start_jobs(self) -> List[int]:
started_ids = []
Expand Down Expand Up @@ -193,12 +193,12 @@ def cycle(self) -> bool:
response_msg = self.socket.recv_json()
logger.debug("Worker response received")

if response_msg.get("exit"):
if response_msg.get("exit"): # type: ignore
logger.info(f"Worker {self.hostname} received exit message: break")
return False

if response_msg.get("new_jobs"):
self.runnable_cache.update({job["id"]: job for job in response_msg["new_jobs"]})
if response_msg.get("new_jobs"): # type: ignore
self.runnable_cache.update({job["id"]: job for job in response_msg["new_jobs"]}) # type: ignore

logger.debug(
f"{self.hostname} fraction available: {self.node_manager.aggregate_free_nodes()} "
Expand All @@ -208,9 +208,9 @@ def cycle(self) -> bool:
return True

def run(self) -> None:
self.context = zmq.Context() # type: ignore
self.context.setsockopt(zmq.LINGER, 0) # type: ignore
self.socket = self.context.socket(zmq.REQ) # type: ignore
self.context = zmq.Context()
self.context.setsockopt(zmq.LINGER, 0)
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(self.master_address)
logger.debug(f"Worker connected to {self.master_address}")

Expand Down

0 comments on commit 4ea2b3a

Please sign in to comment.