Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce non-eager task-submission system #91

Merged
merged 26 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
542b7a1
First non-eager implementation
ccuetom Oct 29, 2024
a4627bf
Add last-ditch attempt to recover task
ccuetom Oct 31, 2024
c4e3738
Improve memory information
ccuetom Oct 31, 2024
a18a1c5
Added fast acqs loading
ccuetom Oct 31, 2024
0d0c7b1
Support for compressing StructuredData
ccuetom Nov 1, 2024
2c97081
Initial implementation of a remote adjoint
ccuetom Nov 1, 2024
ebaab8d
Added anon task interface
ccuetom Nov 4, 2024
bd3060a
Re-implementation of the parallel adjoint
ccuetom Nov 6, 2024
f7e82b8
Force gradient reference in adjoint
ccuetom Nov 6, 2024
59ef9fc
Fix conventional adjoint
ccuetom Nov 7, 2024
b4e2f63
Avoid dict population in sub-calls
ccuetom Nov 7, 2024
9c4bbe0
Keep reference to pending warehouse tasks
ccuetom Nov 7, 2024
7f528a9
Prevent race condition on warehouse objects
ccuetom Nov 7, 2024
95c3686
Prevent warehouse object leakage
ccuetom Nov 7, 2024
832ba43
Introduced TaskArray
ccuetom Nov 11, 2024
30bbc5f
Improve serialisation
ccuetom Nov 11, 2024
6b7c190
Reduced cache warehouse leak
ccuetom Nov 12, 2024
32e39e9
Added residual downsampling
ccuetom Nov 13, 2024
353d099
Minor improvements
ccuetom Nov 14, 2024
c58a7ba
Make test default run method
ccuetom Nov 14, 2024
3bb49b1
Refactored OptimisationLoop dumping
ccuetom Nov 15, 2024
0f23542
Prevent ValueError in task initialisation
ccuetom Nov 15, 2024
13ec2ea
Minor fix
ccuetom Nov 15, 2024
61343eb
Minor fixes
ccuetom Nov 15, 2024
db6019c
Improved array initialisation
ccuetom Nov 19, 2024
ca9e294
Avoid race condition in task initialisation
ccuetom Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: stride
channels:
- conda-forge
- defaults
dependencies:
- python>=3.8, <3.12
- blosc
Expand Down
2 changes: 1 addition & 1 deletion mosaic/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ def _process_send(self, method, cmd=None, reply=False, **kwargs):

self.logger.debug('Sending cmd %s %s to %s (%s) from %s '
'(size %.2f MB)' % (method, cmd['method'], self.uid, cmd['uid'],
self._runtime.uid, msg_size/1024**2))
self._runtime.uid, msg_size/1024**2))
else:
self.logger.debug('Sending msg %s to %s from %s '
'(size %.2f MB)' % (method, self.uid, self._runtime.uid,
Expand Down
26 changes: 26 additions & 0 deletions mosaic/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,17 @@ def cmd(self, method, *args, **kwargs):

"""
wait = kwargs.pop('wait', False)
silence = kwargs.pop('silence', None)
restrict = kwargs.pop('restrict', None)

remotes, cmd = self._prepare_cmd(method, *args, **kwargs)

result = []
for remote in remotes:
if silence is not None and remote.uid in silence:
continue
if restrict is not None and remote.uid not in restrict:
continue
result.append(remote.cmd(**cmd, wait=wait, as_async=False))

if len(result) == 1:
Expand All @@ -259,11 +265,17 @@ def cmd_recv(self, method, *args, **kwargs):

"""
wait = kwargs.pop('wait', False)
silence = kwargs.pop('silence', None)
restrict = kwargs.pop('restrict', None)

remotes, cmd = self._prepare_cmd(method, *args, **kwargs)

result = []
for remote in remotes:
if silence is not None and remote.uid in silence:
continue
if restrict is not None and remote.uid not in restrict:
continue
result.append(remote.cmd(**cmd, wait=wait, reply=True, as_async=False))

if len(result) == 1:
Expand All @@ -289,10 +301,17 @@ async def cmd_async(self, method, *args, **kwargs):
asyncio.Future

"""
silence = kwargs.pop('silence', None)
restrict = kwargs.pop('restrict', None)

remotes, cmd = self._prepare_cmd(method, *args, **kwargs)

result = []
for remote in remotes:
if silence is not None and remote.uid in silence:
continue
if restrict is not None and remote.uid not in restrict:
continue
result.append(await remote.cmd(**cmd))

if len(result) == 1:
Expand All @@ -318,10 +337,17 @@ async def cmd_recv_async(self, method, *args, **kwargs):
asyncio.Future

"""
silence = kwargs.pop('silence', None)
restrict = kwargs.pop('restrict', None)

remotes, cmd = self._prepare_cmd(method, *args, **kwargs)

result = []
for remote in remotes:
if silence is not None and remote.uid in silence:
continue
if restrict is not None and remote.uid not in restrict:
continue
result.append(await remote.cmd(**cmd, reply=True))

if len(result) == 1:
Expand Down
Loading
Loading