-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transport & Engine: AsyncTransport
plugin
#6626
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6626 +/- ##
==========================================
+ Coverage 77.92% 77.97% +0.06%
==========================================
Files 563 564 +1
Lines 41671 42418 +747
==========================================
+ Hits 32467 33072 +605
- Misses 9204 9346 +142 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Looks good, just to reiterate most important comments:
Why don't you just use Transport
instead of BlockingTransport
, since you set it one to the other? Now you have redundancy. I feel like this API is clear to me.
_BaseTransport -> Transport -> SshTransport
_BaseTransport -> AsyncTransport -> AsyncSshTransport
Will you make a PR in plumpy there so we can do a new release?
Tests I will review in the separate PR
@@ -119,7 +120,7 @@ pillow==10.1.0 | |||
platformdirs==3.11.0 | |||
plotly==5.17.0 | |||
pluggy==1.3.0 | |||
plumpy==0.22.3 | |||
plumpy@git+https://github.com/khsrali/plumpy.git@allow-async-upload-download#egg=plumpy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you make a PR there so we can do a new release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes! Please review here: aiidateam/plumpy#272
utils/dependency_management.py
Outdated
if ( | ||
canonicalize_name(requirement_abstract.name) == canonicalize_name(requirement_concrete.name) | ||
and abstract_contains | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we remove this before merge? Otherwise it would be good to add some comment what the new if-else does. Hard to understand without context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to keep it, as it's very useful to pass CI when we make PRs like this, that are hooked to another PR, or branch of other repo with @
The problem is @
is not listed as a valid specifier
in class Specifier
.
This little change, basically, accepts @
as a valid specifier and will check if a hooked dependency is to the same "version" across all files, requirement-xx
and enviroment.yml
, etc...
This way, apart of this nice check, the dependency test fails and it still triggers the main unit tests test-presto
, test-3.xx
for such PRs.. (otherwise it won't)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few lines of comment to clarify this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice, perhaps would be better to separate into standalone PR for visibility.
btw: I started looking into using uv lockfile in #6640, seems like a better strategy than having to wrangle 4 different requirements files. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, this feature is already covered in the new PR #6640.
So I keep the changes temporarily for this PR only, and will revert 'utils/dependency_management.py'
before any merge.
return str(path) | ||
|
||
|
||
class _BaseTransport: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this part of public API? I should use it if I create a new transport plugin? Or should I use Transport
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no this is private. No one should inherent from this except 'AsyncTransport', 'BlockingTransport'.
Only 'AsyncTransport', 'BlockingTransport' are the public ones -- to be used to create a new plugin--
src/aiida/transports/transport.py
Outdated
|
||
|
||
# This is here for backwards compatibility | ||
Transport = BlockingTransport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if this makes sense to make blocking the default one, especially if you expose both of them in the API. Shouldn't there be a public class for Blocking and Nonblocking transport which one should use to inherit from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just for backward compatibility as Giovanni suggested to call the former blocking Transport
, now as, BlockingTransport
@@ -164,7 +167,8 @@ def test_upload_local_copy_list( | |||
calc_info.local_copy_list = [[folder.uuid] + local_copy_list] | |||
|
|||
with node.computer.get_transport() as transport: | |||
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) | |||
runner = get_manager().get_runner() | |||
runner.loop.run_until_complete(execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because execmanager.upload_calculation
is now a async function.. this way we can call it in a sync test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you use the old way? The test just passes and continues before finishing the command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is very tricky to mix up the async programming and sync function, it is in general a very hard problem. This looks to me the runner.loop.run_until_complete
will block the running of the task until it complete so give no benefit after making these methods async. Is the create_task
the correct thing to use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I just asked Ali offline. This is only for tests and only for test the functionality of the implementation is correct. The async behaviors of four operations working together is not the purpose here.
@@ -86,3 +86,24 @@ def copy_from_remote_to_remote(transportsource, transportdestination, remotesour | |||
.. note:: it uses the method transportsource.copy_from_remote_to_remote | |||
""" | |||
transportsource.copy_from_remote_to_remote(transportdestination, remotesource, remotedestination, **kwargs) | |||
|
|||
|
|||
async def copy_from_remote_to_remote_async( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required in the utils? I don't find any usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how it's used, tbh, probably by external plugins? so far I just provide the similar functionality as in copy_from_remote_to_remote
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay something that might be cleaned up in the future but for this PR it does not make so much sense
I am about to finish #6627 which I think can benefit for the tests here as well. Please hold a bit for that. I'll try my best to get that one merge by Wednesday. |
I just followed what @giovannipizzi suggested. But agreed this makes more sense, so I'm gonna apply this changes..
Will do once my performance tests are ready.. |
Note to myself: |
utils/dependency_management.py
Outdated
if ( | ||
canonicalize_name(requirement_abstract.name) == canonicalize_name(requirement_concrete.name) | ||
and abstract_contains | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor changes
@@ -86,3 +86,24 @@ def copy_from_remote_to_remote(transportsource, transportdestination, remotesour | |||
.. note:: it uses the method transportsource.copy_from_remote_to_remote | |||
""" | |||
transportsource.copy_from_remote_to_remote(transportdestination, remotesource, remotedestination, **kwargs) | |||
|
|||
|
|||
async def copy_from_remote_to_remote_async( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay something that might be cleaned up in the future but for this PR it does not make so much sense
Note: |
Checklist:
|
Hi @khsrali, I merge #6640, so it should work now I guess. Can you resolve the conflict and try it again? Thanks. |
Thanks @unkcpz , now I face issues I never had before, lol:
actually I even tried to update the file using 'uv lock', still won't pass.. |
Sorry for the experience. We are now trying
I will push the fix now, but I basically only ran these two commands |
uv add git+https://github.com/aiidateam/plumpy --branch async-run uv add git+https://github.com/ronf/asyncssh --rev 033ef54302b2b09d496d68ccf39778b9e5fc89e2
@agoscinski It would be nice to have it merged by the end of this week, because when I come back from holidays, |
This PR proposes many changes to make transport tasks asynchronous. This ensures that the daemon won’t be blocked by time-consuming tasks such as uploads, downloads, and similar operations, requested by @giovannipizzi.
Here’s a summary of the main updates:
AsyncSshTransport
with the entry pointcore.ssh_async
.AsyncSshTransport
supports executing custom scripts before connections, which is particularly useful for authentication. 🥇transport.chdir()
andtransport.getcwd()
(merged inTransport
&Engine
: factor outgetcwd()
&chdir()
for compatibility with upcoming async transport #6594).AsyncSshTransport
.Transport
class. Introduces_BaseTransport
,Transport
, andAsyncTransport
as replacements.Transport
, while asynchronous ones should inherit fromAsyncSshTransport
.test_all_plugins.py
to reflect these changes. Unfortunately, existing tests for transport plugins remain minimal and need improvement in a separate PR (TODO).TransportPath
type and upgrades transport plugins to work withUnion[str, Path, PurePosixPath]
.copy_from_remote_to_remote_async
, addressing a previous issue where such tasks blocked the entire daemon.Dependencies: This PR relies on PR 272 in plumpy.
Note: The initial commits by Chris were pulled from #6079 (closed).
Test Results: Performance Comparisons
When
core.ssh_async
OutperformsIn scenarios where the daemon is blocked by heavy transfer tasks (uploading/downloading/copying large files),
core.ssh_async
shows significant improvement.For example, I submitted two WorkGraphs:
touch file
.The time taken until the submit command is processed (with one daemon running):
core.ssh_async
: Only 4 seconds! 🚀🚀🚀🚀 A major improvement!core.ssh
: 108 seconds (WorkGraph 1 fully completes before processing the second).When
core.ssh_async
andcore.ssh
Are ComparableFor tasks involving both (and many!) uploads and downloads (a common scenario), performance varies slightly depending on the case.
Large Files (~1 GB):
core.ssh_async
performs better due to simultaneous uploads and downloads. In some networks, this can almost double the bandwidth, as demonstrated in the graph below. My bandwidth is 11.8 MB/s but increased to nearly double under favorable conditions:However, under heavy network load, bandwidth may revert to its base level (e.g., 11.8 MB/s):
Test Case: Two WorkGraphs: one uploads 1 GB, the other retrieves 1 GB using
RemoteData
.core.ssh_async
: 120 secondscore.ssh
: 204 secondsSmall Files (Many Small Transfers):
core.ssh_async
: 105 secondscore.ssh
: 65 secondsIn this scenario, the overhead of asynchronous calls seems to outweigh the benefits. We need to discuss the trade-offs and explore possible optimizations. As @agoscinski mentioned, this might be expected, see here async overheads.