Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libflux: add flux_send_new() #5499

Merged
merged 7 commits into from
Oct 13, 2023
Merged

Conversation

garlick
Copy link
Member

@garlick garlick commented Oct 12, 2023

Problem: it's a common pattern to create a message, send it with with flux_send(), then destroy it; however, there is no interface to allow the message ownership to be transferred rather than copied.

Add flux_send_new() which accepts a non-const flux_msg_t ** and implement it in the interthread connector. Now messages can pass from one end to the other without costly duplication.

Use this function in the common flux_rpc() and flux_respond() family of functions.

This is marked WIP for now while I consider whether it's possible to use this in the broker along the message path. It would be neat if rpcs between broker modules could transit the broker without copying!

Even just this change does seem to have a positive impact on job throughput.

Problem: some older rpc and response handling code does
not conform to modern project norms.

Make the following changes
- break long compound conditinals to one per line
- break long parameter lists to one per line
@garlick garlick changed the title WIP: libflux: add flux_send_new() libflux: add flux_send_new() Oct 13, 2023
@garlick
Copy link
Member Author

garlick commented Oct 13, 2023

I corrected problem where I was updating a message reference count outside of the lock when it should have been inside.
Also added some unit tests.

Dropping the WIP since I think any work in the broker is going to need to be its own PR.

@garlick
Copy link
Member Author

garlick commented Oct 13, 2023

Looks like caliper is always enabled in CI. When caliper is enabled, flux_send_new() was falling back to flux_send(), which caused one of the unit tests to fail and would have prevented coverage of the new code.

To address this I just moved the call to profiling_msg_snapshot() to before the send call so it could be used in flux_send_new() without a use-after-free. I think this is OK. If the send fails, it means caliper might see an RPC request without a response, but of course requests and responses are not evenly matched in flux anyway...

Problem: it's a common pattern to create a message, send it
with with flux_send(), then destroy it; however, there is no
interface to allow flux_send() to just take over the message
and thereby avoid a costly message copy if/when possible.

Add flux_send_new() which accepts a non-const flux_msg_t **.
The reference count must be 1, and on success *msg is set to NULL,
so there should be no possibility of the message being reused.

This just calls flux_send() internally if the connector does not
implement op->send_new(), or if other optional things in the
environment make calling op->send_new() unworkable (such as
RPC tracking which wants to temporarily hold copy of each request
until a response is received).

Move the call to profiling_msg_snapshot() to before the send rather
than after so send_new() doesn't have to fall back to send() when
flux is built with caliper profiling.
Problem: several well traveled code paths in libflux call flux_send()
then immediately destroy the message, thereby not communicating that
the message need not be copied before modification.

Call flux_send_new() in rpc, response, and logging code.
@grondo
Copy link
Contributor

grondo commented Oct 13, 2023

When caliper is enabled, flux_send_new() was falling back to flux_send(),

Oh, I was wondering why the function always fell back to flux_send() if HAVE_CALIPER was enabled. Just out of curiosity, why was that?

@garlick
Copy link
Member Author

garlick commented Oct 13, 2023

The call to profiling_msg_snapshot() was being called after the send, and in the send_new case, the message is no longer available after the send. My realization was that it probably doesn't need to be called after the send.

@codecov
Copy link

codecov bot commented Oct 13, 2023

Codecov Report

Merging #5499 (4898293) into master (cbd871d) will decrease coverage by 0.03%.
The diff coverage is 91.80%.

@@            Coverage Diff             @@
##           master    #5499      +/-   ##
==========================================
- Coverage   83.69%   83.66%   -0.03%     
==========================================
  Files         484      484              
  Lines       81561    81537      -24     
==========================================
- Hits        68261    68218      -43     
- Misses      13300    13319      +19     
Files Coverage Δ
src/broker/module.c 78.34% <ø> (-0.14%) ⬇️
src/common/libflux/rpc.c 90.99% <100.00%> (+0.12%) ⬆️
src/common/libflux/flog.c 84.21% <66.66%> (-1.88%) ⬇️
src/common/libflux/connector_interthread.c 87.75% <88.23%> (+0.57%) ⬆️
src/common/libflux/handle.c 86.48% <86.36%> (-0.08%) ⬇️
src/common/libflux/response.c 79.18% <90.69%> (-0.38%) ⬇️

... and 13 files with indirect coverage changes

@grondo
Copy link
Contributor

grondo commented Oct 13, 2023

Ah, so the connector implementation of send_new() is required to destroy the message, it isn't done in the flux_send_new() wrapper? (and would it improve things if it were done that way?)

@garlick
Copy link
Member Author

garlick commented Oct 13, 2023

No the send_new() implementation doesn't destroy the message. It takes ownership from the sender and transfers it to the other end of the channel where it's used by the receiver.

@grondo
Copy link
Contributor

grondo commented Oct 13, 2023

Ok, thanks! That makes sense now.

Copy link
Contributor

@grondo grondo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Problem: the interthread connector plugin does not implement
the send_new() operation, which can reduce message copying.

Add a send_new() operation.
Problem: there is no unit test coverage for flux_send_new().

Add some tests to the handle unit test.
Problem: the broker module internally defines a message credential
that is no longer used, now that the interthread connector does that.

Drop it.
Problem: there is no man page for flux_send_new().

Add it to flux_send.rst and configure a stub to be produced for
flux_send_new().

The previous synopsis function prototypes were not within a literal
block and had to be escaped.  Fix that while adding the new function.
@garlick
Copy link
Member Author

garlick commented Oct 13, 2023

Thanks! I'll set MWP.

@garlick
Copy link
Member Author

garlick commented Oct 13, 2023

Got a couple of test failures in the el8,coverage builder - likely not related to this PR?

2023-10-13T16:56:53.4553772Z ##[error]not ok 8 - 0002-exec-with-imp.t: flux exec --with-imp forwards signals
2023-10-13T16:56:53.4558585Z ##[error]not ok 15 - 0004-recovery.t: flux start --recover works
2023-10-13T16:56:53.4561770Z ##[error]ERROR: t9000-system.t - exited with status 1

and nothing else really to go on in the test logs. I'll retry.

@mergify mergify bot merged commit f6547f4 into flux-framework:master Oct 13, 2023
30 checks passed
@garlick garlick deleted the send_new branch October 13, 2023 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants