Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make forwardmodelrunner async #9198

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix tests
  • Loading branch information
jonathan-eq committed Nov 26, 2024
commit e299ba62a4850ddb5c510335242b6d2ffb991f7c
21 changes: 10 additions & 11 deletions src/_ert/forward_model_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os
import signal
import sys
import time
import typing
from datetime import datetime

Expand Down Expand Up @@ -72,21 +71,21 @@ def _setup_logging(directory: str = "logs"):
JOBS_JSON_RETRY_TIME = 30


def _wait_for_retry():
time.sleep(JOBS_JSON_RETRY_TIME)
async def _wait_for_retry():
Copy link
Contributor

@xjules xjules Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we need this helper function at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need it for one of the tests. test_job_dispatch.py::test_retry_of_jobs_json_file_read

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this usage of that function is a bit strange though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mock it to lock.acquire in a test, so that it will stop here

await asyncio.sleep(JOBS_JSON_RETRY_TIME)


def _read_jobs_file(retry=True):
async def _read_jobs_file(retry=True):
try:
with open(JOBS_FILE, "r", encoding="utf-8") as json_file:
with open(JOBS_FILE, "r", encoding="utf-8") as json_file: # noqa: ASYNC230
return json.load(json_file)
except json.JSONDecodeError as e:
raise IOError("Job Runner cli failed to load JSON-file.") from e
except FileNotFoundError as e:
if retry:
logger.error(f"Could not find file {JOBS_FILE}, retrying")
_wait_for_retry()
return _read_jobs_file(retry=False)
await _wait_for_retry()
return await _read_jobs_file(retry=False)
else:
raise e

Expand Down Expand Up @@ -119,7 +118,7 @@ async def main(args):
# Make sure that logging is setup _after_ we have moved to the runpath directory
_setup_logging()

jobs_data = _read_jobs_file()
jobs_data = await _read_jobs_file()

experiment_id = jobs_data.get("experiment_id")
ens_id = jobs_data.get("ens_id")
Expand Down Expand Up @@ -167,9 +166,9 @@ async def _main(
print(
f"job_dispatch failed due to {oserror}. Stopping and cleaning up."
)
return
raise SystemExit(1)

if isinstance(job_status, Finish) and not job_status.success():
return
raise SystemExit(1)
except asyncio.CancelledError:
pass
raise SystemExit(1)
16 changes: 12 additions & 4 deletions src/_ert/forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ def __init__(self, evaluator_url, token=None, cert_path=None):
self._ens_id = None
self._real_id = None
self._event_queue: asyncio.Queue[events.Event | EventSentinel] = asyncio.Queue()
# self._event_publisher_thread = ErtThread(target=self._event_publisher)
self._timeout_timestamp = None
self._timestamp_lock = threading.Lock()
# seconds to timeout the reporter the thread after Finish() was received
self._reporter_timeout = 60
self._running = True
self._event_publishing_task = asyncio.create_task(self.async_event_publisher())
self._event_publisher_ready = asyncio.Event()

async def join(self) -> None:
await self._event_publishing_task

async def async_event_publisher(self):
logger.debug("Publishing event.")
Expand All @@ -91,8 +93,9 @@ async def async_event_publisher(self):
token=self._token,
cert=self._cert,
) as client:
self._event_publisher_ready.set()
event = None
while self._running:
while True:
with self._timestamp_lock:
if (
self._timeout_timestamp is not None
Expand All @@ -103,14 +106,17 @@ async def async_event_publisher(self):
if event is None:
# if we successfully sent the event we can proceed
# to next one
print("GETTING MORE EVENTS!")
event = await self._event_queue.get()
if event is self._sentinel:
self._event_queue.task_done()
print("NEW EVENT WAS SENTINEL :))")
return
break
try:
await client.send(event_to_json(event))
self._event_queue.task_done()
event = None
print("Sent event :)")
except ClientConnectionError as exception:
# Possible intermittent failure, we retry sending the event
logger.error(str(exception))
Expand All @@ -122,9 +128,11 @@ async def async_event_publisher(self):
break

async def report(self, msg):
await self._event_publisher_ready.wait()
await self._statemachine.transition(msg)

async def _dump_event(self, event: events.Event):
print(f"DUMPED EVENT {type(event)=}")
logger.debug(f'Schedule "{type(event)}" for delivery')
await self._event_queue.put(event)

Expand Down
2 changes: 1 addition & 1 deletion src/_ert/forward_model_runner/reporting/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ async def transition(self, message: Message):
f"Illegal transition {self._state} -> {new_state} for {message}, "
f"expected to transition into {self._transitions[self._state]}"
)

print(f"TRANSITIONING STATE W/{message=}")
await self._handler[new_state](message)
self._state = new_state
Loading